diff --git a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java
index f442de6d527..324392db150 100644
--- a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java
+++ b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java
@@ -54,7 +54,7 @@ public void beforeStart(final ClientCallStreamObserver requestStre
this.requestStream = requestStream;
// Set up manual flow control for the response stream. It feels backwards to configure the response
// stream's flow control using the request stream's observer, but this is the way it is.
- requestStream.disableAutoInboundFlowControl();
+ requestStream.disableAutoRequestWithInitial(1);
// Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked
// when the consuming side has enough buffer space to receive more messages.
diff --git a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java
index 4ad59308437..b8fbff65a30 100644
--- a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java
+++ b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java
@@ -39,7 +39,7 @@ public StreamObserver sayHelloStreaming(final StreamObserver serverCallStreamObserver =
(ServerCallStreamObserver) responseObserver;
- serverCallStreamObserver.disableAutoInboundFlowControl();
+ serverCallStreamObserver.disableAutoRequestWithInitial(0);
// Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked
// when the consuming side has enough buffer space to receive more messages.
diff --git a/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java b/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java
index beadb0f1eba..9be04d753f8 100644
--- a/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java
+++ b/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java
@@ -135,8 +135,7 @@ public StreamObserver serverReflectionInfo(
ProtoReflectionStreamObserver requestObserver =
new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver);
serverCallStreamObserver.setOnReadyHandler(requestObserver);
- serverCallStreamObserver.disableAutoInboundFlowControl();
- serverCallStreamObserver.request(1);
+ serverCallStreamObserver.disableAutoRequestWithInitial(1);
return requestObserver;
}
diff --git a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
index 00cc42ca3bd..8cd1041d36d 100644
--- a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
+++ b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
@@ -532,8 +532,11 @@ public void flowControl() throws Exception {
(ClientCallStreamObserver)
stub.serverReflectionInfo(clientResponseObserver);
- // ClientCalls.startCall() calls request(1) initially, so we should get an immediate response.
+ // Verify we don't receive a response until we request it.
requestObserver.onNext(flowControlRequest);
+ assertEquals(0, clientResponseObserver.getResponses().size());
+
+ requestObserver.request(1);
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
@@ -557,17 +560,15 @@ public void flowControlOnCompleteWithPendingRequest() throws Exception {
(ClientCallStreamObserver)
stub.serverReflectionInfo(clientResponseObserver);
- // ClientCalls.startCall() calls request(1) initially, so make additional request.
- requestObserver.onNext(flowControlRequest);
requestObserver.onNext(flowControlRequest);
requestObserver.onCompleted();
- assertEquals(1, clientResponseObserver.getResponses().size());
+ assertEquals(0, clientResponseObserver.getResponses().size());
assertFalse(clientResponseObserver.onCompleteCalled());
requestObserver.request(1);
assertTrue(clientResponseObserver.onCompleteCalled());
- assertEquals(2, clientResponseObserver.getResponses().size());
- assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
+ assertEquals(1, clientResponseObserver.getResponses().size());
+ assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
}
private final ServerReflectionRequest flowControlRequest =
@@ -594,7 +595,7 @@ private static class FlowControlClientResponseObserver
@Override
public void beforeStart(final ClientCallStreamObserver requestStream) {
- requestStream.disableAutoInboundFlowControl();
+ requestStream.disableAutoRequestWithInitial(0);
}
@Override
diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
index 98fa6fba57e..d24f835aec4 100644
--- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
@@ -106,9 +106,37 @@ public abstract class CallStreamObserver implements StreamObserver {
*
*
*
+ *
+ *
To migrate to {@link #disableAutoRequestWithInitial} on the server side, call
+ * {@code disableAutoRequestWithInitial(0)} as {@code disableAutoInboundFlowControl}
+ * already disables all inbound requests. On the client side, {@code
+ * disableAutoRequestWithInitial(1)} should be called to maintain existing behavior as
+ * {@code disableAutoInboundFlowControl} does not disable the initial request.
*/
public abstract void disableAutoInboundFlowControl();
+ /**
+ * Disables automatic flow control where an additional message is requested to be read after a
+ * call to the 'inbound' {@link io.grpc.stub.StreamObserver#onNext(Object)} has completed. A
+ * number of initial requests to make when the call is started may be specified.
+ *
+ *
On client-side this method may only be called during {@link
+ * ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
+ * call to the application, before the service returns its {@code StreamObserver}.
+ *
+ *
Note that for server-side cases where the message is received before the handler is invoked,
+ * this method will have no effect. This is true for:
+ *
+ *