From 8daca099967e61ab53874674108089f697b38a61 Mon Sep 17 00:00:00 2001 From: Andrei Kandratovich Date: Thu, 12 May 2022 15:29:14 +0200 Subject: [PATCH 1/4] Respect requested message limits within a single MessageProducer in BinderTransport. --- binder/src/main/java/io/grpc/binder/internal/Inbound.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binder/src/main/java/io/grpc/binder/internal/Inbound.java b/binder/src/main/java/io/grpc/binder/internal/Inbound.java index da1a2961546..5ab96085a41 100644 --- a/binder/src/main/java/io/grpc/binder/internal/Inbound.java +++ b/binder/src/main/java/io/grpc/binder/internal/Inbound.java @@ -468,7 +468,7 @@ public final synchronized InputStream next() { if (firstMessage != null) { stream = firstMessage; firstMessage = null; - } else if (messageAvailable()) { + } else if (numRequestedMessages > 0 && messageAvailable()) { stream = assembleNextMessage(); } if (stream != null) { From 0b4f1656ccf04756e944408c5d3437b76dca1dbe Mon Sep 17 00:00:00 2001 From: Andrei Kandratovich Date: Thu, 12 May 2022 15:31:35 +0200 Subject: [PATCH 2/4] Test for transport respects requested message limits within a single MessageProducer. --- .../grpc/internal/AbstractTransportTest.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java index 06dfef5daa8..a850dba0dfd 100644 --- a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java @@ -1503,6 +1503,35 @@ private int verifyMessageCountAndClose(BlockingQueue messageQueue, return count; } + @Test + public void messageProducerOnlyProducesRequestedMessages() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + startTransport(client, mockClientTransportListener); + MockServerTransportListener serverTransportListener = + serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + serverTransport = serverTransportListener.transport; + + ClientStream clientStream = client.newStream( + methodDescriptor, new Metadata(), callOptions, tracers); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + clientStream.start(clientStreamListener); + StreamCreation serverStreamCreation = + serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method); + ServerStream serverStream = serverStreamCreation.stream; + ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; + + clientStream.writeMessage(methodDescriptor.streamRequest("MESSAGE")); + clientStream.writeMessage(methodDescriptor.streamRequest("MESSAGE")); + clientStream.flush(); + + serverStream.request(1); + + // Verify server only receives one message. + verifyMessageCountAndClose(serverStreamListener.messageQueue, 1); + } + @Test public void interactionsAfterServerStreamCloseAreNoops() throws Exception { server.start(serverListener); From f6ff5b3a674f1f4b0e0d1acf0b69c40cc0791220 Mon Sep 17 00:00:00 2001 From: Andrei Kandratovich Date: Thu, 12 May 2022 15:41:27 +0200 Subject: [PATCH 3/4] Refactor test --- .../java/io/grpc/internal/AbstractTransportTest.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java index a850dba0dfd..91e88e3f28c 100644 --- a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java @@ -1512,6 +1512,7 @@ public void messageProducerOnlyProducesRequestedMessages() throws Exception { serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; + // Start an RPC. ClientStream clientStream = client.newStream( methodDescriptor, new Metadata(), callOptions, tracers); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); @@ -1519,17 +1520,15 @@ public void messageProducerOnlyProducesRequestedMessages() throws Exception { StreamCreation serverStreamCreation = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method); - ServerStream serverStream = serverStreamCreation.stream; - ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; + // Have the client send two messages. clientStream.writeMessage(methodDescriptor.streamRequest("MESSAGE")); clientStream.writeMessage(methodDescriptor.streamRequest("MESSAGE")); clientStream.flush(); - serverStream.request(1); - - // Verify server only receives one message. - verifyMessageCountAndClose(serverStreamListener.messageQueue, 1); + // Verify server only receives one message if that's all it requests. + serverStreamCreation.stream.request(1); + verifyMessageCountAndClose(serverStreamCreation.listener.messageQueue, 1); } @Test From 34609fcde21c449830458fcdecd9a0ea50e92cab Mon Sep 17 00:00:00 2001 From: Andrei Kandratovich Date: Thu, 12 May 2022 18:14:55 +0200 Subject: [PATCH 4/4] Update AbstractTransportTest.java --- core/src/test/java/io/grpc/internal/AbstractTransportTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java index 91e88e3f28c..d70d8bc6b81 100644 --- a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java @@ -1526,6 +1526,8 @@ public void messageProducerOnlyProducesRequestedMessages() throws Exception { clientStream.writeMessage(methodDescriptor.streamRequest("MESSAGE")); clientStream.flush(); + doPingPong(serverListener); + // Verify server only receives one message if that's all it requests. serverStreamCreation.stream.request(1); verifyMessageCountAndClose(serverStreamCreation.listener.messageQueue, 1);