From 2c33e39f5dbd950cb9295b34a01e5e9ecb1bcb14 Mon Sep 17 00:00:00 2001 From: Andrei Kandratovich Date: Mon, 16 May 2022 18:23:16 +0200 Subject: [PATCH] binder: Respect requested message limits within a single MessageProducer --- .../java/io/grpc/binder/internal/Inbound.java | 2 +- .../grpc/internal/AbstractTransportTest.java | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/binder/src/main/java/io/grpc/binder/internal/Inbound.java b/binder/src/main/java/io/grpc/binder/internal/Inbound.java index da1a2961546..5ab96085a41 100644 --- a/binder/src/main/java/io/grpc/binder/internal/Inbound.java +++ b/binder/src/main/java/io/grpc/binder/internal/Inbound.java @@ -468,7 +468,7 @@ public final synchronized InputStream next() { if (firstMessage != null) { stream = firstMessage; firstMessage = null; - } else if (messageAvailable()) { + } else if (numRequestedMessages > 0 && messageAvailable()) { stream = assembleNextMessage(); } if (stream != null) { diff --git a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java index 06dfef5daa8..d70d8bc6b81 100644 --- a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java @@ -1503,6 +1503,36 @@ private int verifyMessageCountAndClose(BlockingQueue messageQueue, return count; } + @Test + public void messageProducerOnlyProducesRequestedMessages() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + startTransport(client, mockClientTransportListener); + MockServerTransportListener serverTransportListener = + serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + serverTransport = serverTransportListener.transport; + + // Start an RPC. + ClientStream clientStream = client.newStream( + methodDescriptor, new Metadata(), callOptions, tracers); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + clientStream.start(clientStreamListener); + StreamCreation serverStreamCreation = + serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method); + + // Have the client send two messages. + clientStream.writeMessage(methodDescriptor.streamRequest("MESSAGE")); + clientStream.writeMessage(methodDescriptor.streamRequest("MESSAGE")); + clientStream.flush(); + + doPingPong(serverListener); + + // Verify server only receives one message if that's all it requests. + serverStreamCreation.stream.request(1); + verifyMessageCountAndClose(serverStreamCreation.listener.messageQueue, 1); + } + @Test public void interactionsAfterServerStreamCloseAreNoops() throws Exception { server.start(serverListener);