diff --git a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java index f442de6d527..324392db150 100644 --- a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java +++ b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java @@ -54,7 +54,7 @@ public void beforeStart(final ClientCallStreamObserver requestStre this.requestStream = requestStream; // Set up manual flow control for the response stream. It feels backwards to configure the response // stream's flow control using the request stream's observer, but this is the way it is. - requestStream.disableAutoInboundFlowControl(); + requestStream.disableAutoRequestWithInitial(1); // Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked // when the consuming side has enough buffer space to receive more messages. diff --git a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java index 4ad59308437..b8fbff65a30 100644 --- a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java +++ b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java @@ -39,7 +39,7 @@ public StreamObserver sayHelloStreaming(final StreamObserver serverCallStreamObserver = (ServerCallStreamObserver) responseObserver; - serverCallStreamObserver.disableAutoInboundFlowControl(); + serverCallStreamObserver.disableAutoRequestWithInitial(0); // Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked // when the consuming side has enough buffer space to receive more messages. diff --git a/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java b/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java index beadb0f1eba..9be04d753f8 100644 --- a/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java +++ b/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java @@ -135,8 +135,7 @@ public StreamObserver serverReflectionInfo( ProtoReflectionStreamObserver requestObserver = new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver); serverCallStreamObserver.setOnReadyHandler(requestObserver); - serverCallStreamObserver.disableAutoInboundFlowControl(); - serverCallStreamObserver.request(1); + serverCallStreamObserver.disableAutoRequestWithInitial(1); return requestObserver; } diff --git a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java index 00cc42ca3bd..8cd1041d36d 100644 --- a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java +++ b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java @@ -532,8 +532,11 @@ public void flowControl() throws Exception { (ClientCallStreamObserver) stub.serverReflectionInfo(clientResponseObserver); - // ClientCalls.startCall() calls request(1) initially, so we should get an immediate response. + // Verify we don't receive a response until we request it. requestObserver.onNext(flowControlRequest); + assertEquals(0, clientResponseObserver.getResponses().size()); + + requestObserver.request(1); assertEquals(1, clientResponseObserver.getResponses().size()); assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0)); @@ -557,17 +560,15 @@ public void flowControlOnCompleteWithPendingRequest() throws Exception { (ClientCallStreamObserver) stub.serverReflectionInfo(clientResponseObserver); - // ClientCalls.startCall() calls request(1) initially, so make additional request. - requestObserver.onNext(flowControlRequest); requestObserver.onNext(flowControlRequest); requestObserver.onCompleted(); - assertEquals(1, clientResponseObserver.getResponses().size()); + assertEquals(0, clientResponseObserver.getResponses().size()); assertFalse(clientResponseObserver.onCompleteCalled()); requestObserver.request(1); assertTrue(clientResponseObserver.onCompleteCalled()); - assertEquals(2, clientResponseObserver.getResponses().size()); - assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1)); + assertEquals(1, clientResponseObserver.getResponses().size()); + assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0)); } private final ServerReflectionRequest flowControlRequest = @@ -594,7 +595,7 @@ private static class FlowControlClientResponseObserver @Override public void beforeStart(final ClientCallStreamObserver requestStream) { - requestStream.disableAutoInboundFlowControl(); + requestStream.disableAutoRequestWithInitial(0); } @Override diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java index 98fa6fba57e..d24f835aec4 100644 --- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java @@ -106,9 +106,37 @@ public abstract class CallStreamObserver implements StreamObserver { * * *

+ * + *

To migrate to {@link #disableAutoRequestWithInitial} on the server side, call + * {@code disableAutoRequestWithInitial(0)} as {@code disableAutoInboundFlowControl} + * already disables all inbound requests. On the client side, {@code + * disableAutoRequestWithInitial(1)} should be called to maintain existing behavior as + * {@code disableAutoInboundFlowControl} does not disable the initial request. */ public abstract void disableAutoInboundFlowControl(); + /** + * Disables automatic flow control where an additional message is requested to be read after a + * call to the 'inbound' {@link io.grpc.stub.StreamObserver#onNext(Object)} has completed. A + * number of initial requests to make when the call is started may be specified. + * + *

On client-side this method may only be called during {@link + * ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial + * call to the application, before the service returns its {@code StreamObserver}. + * + *

Note that for server-side cases where the message is received before the handler is invoked, + * this method will have no effect. This is true for: + * + *

    + *
  • {@link io.grpc.MethodDescriptor.MethodType#UNARY} operations.
  • + *
  • {@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.
  • + *
+ *

+ * + *

This API is still a work in-progress and will likely change in the future. + */ + public abstract void disableAutoRequestWithInitial(int request); + /** * Requests the peer to produce {@code count} more messages to be delivered to the 'inbound' * {@link StreamObserver}. diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 82c370834c8..dcdce5471a1 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -162,7 +162,7 @@ public static RespT blockingUnaryCall( public static Iterator blockingServerStreamingCall( ClientCall call, ReqT req) { BlockingResponseStream result = new BlockingResponseStream<>(call); - asyncUnaryRequestCall(call, req, result.listener(), true); + asyncUnaryRequestCall(call, req, result.listener()); return result; } @@ -179,7 +179,7 @@ public static Iterator blockingServerStreamingCall( ThreadlessExecutor executor = new ThreadlessExecutor(); ClientCall call = channel.newCall(method, callOptions.withExecutor(executor)); BlockingResponseStream result = new BlockingResponseStream<>(call, executor); - asyncUnaryRequestCall(call, req, result.listener(), true); + asyncUnaryRequestCall(call, req, result.listener()); return result; } @@ -193,7 +193,7 @@ public static Iterator blockingServerStreamingCall( public static ListenableFuture futureUnaryCall( ClientCall call, ReqT req) { GrpcFuture responseFuture = new GrpcFuture<>(call); - asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture), false); + asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture)); return responseFuture; } @@ -274,17 +274,14 @@ private static void asyncUnaryRequestCall( req, new StreamObserverToCallListenerAdapter<>( responseObserver, - new CallToStreamObserverAdapter<>(call), - streamingResponse), - streamingResponse); + new CallToStreamObserverAdapter<>(call, streamingResponse))); } private static void asyncUnaryRequestCall( ClientCall call, ReqT req, - ClientCall.Listener responseListener, - boolean streamingResponse) { - startCall(call, responseListener, streamingResponse); + StartableListener responseListener) { + startCall(call, responseListener); try { call.sendMessage(req); call.halfClose(); @@ -299,40 +296,39 @@ private static StreamObserver asyncStreamingRequestCall( ClientCall call, StreamObserver responseObserver, boolean streamingResponse) { - CallToStreamObserverAdapter adapter = new CallToStreamObserverAdapter<>(call); + CallToStreamObserverAdapter adapter = new CallToStreamObserverAdapter<>( + call, streamingResponse); startCall( call, - new StreamObserverToCallListenerAdapter<>( - responseObserver, adapter, streamingResponse), - streamingResponse); + new StreamObserverToCallListenerAdapter<>(responseObserver, adapter)); return adapter; } private static void startCall( ClientCall call, - ClientCall.Listener responseListener, - boolean streamingResponse) { + StartableListener responseListener) { call.start(responseListener, new Metadata()); - if (streamingResponse) { - call.request(1); - } else { - // Initially ask for two responses from flow-control so that if a misbehaving server sends - // more than one responses, we can catch it and fail it in the listener. - call.request(2); - } + responseListener.onStart(); + } + + private abstract static class StartableListener extends ClientCall.Listener { + abstract void onStart(); } private static final class CallToStreamObserverAdapter extends ClientCallStreamObserver { private boolean frozen; private final ClientCall call; + private final boolean streamingResponse; private Runnable onReadyHandler; - private boolean autoFlowControlEnabled = true; + private int initialRequest = 1; + private boolean autoRequestEnabled = true; private boolean aborted = false; private boolean completed = false; // Non private to avoid synthetic class - CallToStreamObserverAdapter(ClientCall call) { + CallToStreamObserverAdapter(ClientCall call, boolean streamingResponse) { this.call = call; + this.streamingResponse = streamingResponse; } private void freeze() { @@ -372,18 +368,32 @@ public void setOnReadyHandler(Runnable onReadyHandler) { this.onReadyHandler = onReadyHandler; } + @Deprecated @Override public void disableAutoInboundFlowControl() { + disableAutoRequestWithInitial(1); + } + + @Override + public void disableAutoRequestWithInitial(int request) { if (frozen) { throw new IllegalStateException( "Cannot disable auto flow control after call started. Use ClientResponseObserver"); } - autoFlowControlEnabled = false; + Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative"); + initialRequest = request; + autoRequestEnabled = false; } @Override public void request(int count) { - call.request(count); + if (!streamingResponse && count == 1) { + // Initially ask for two responses from flow-control so that if a misbehaving server + // sends more than one responses, we can catch it and fail it in the listener. + call.request(2); + } else { + call.request(count); + } } @Override @@ -398,19 +408,16 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) { } private static final class StreamObserverToCallListenerAdapter - extends ClientCall.Listener { + extends StartableListener { private final StreamObserver observer; private final CallToStreamObserverAdapter adapter; - private final boolean streamingResponse; private boolean firstResponseReceived; // Non private to avoid synthetic class StreamObserverToCallListenerAdapter( StreamObserver observer, - CallToStreamObserverAdapter adapter, - boolean streamingResponse) { + CallToStreamObserverAdapter adapter) { this.observer = observer; - this.streamingResponse = streamingResponse; this.adapter = adapter; if (observer instanceof ClientResponseObserver) { @SuppressWarnings("unchecked") @@ -427,7 +434,7 @@ public void onHeaders(Metadata headers) { @Override public void onMessage(RespT message) { - if (firstResponseReceived && !streamingResponse) { + if (firstResponseReceived && !adapter.streamingResponse) { throw Status.INTERNAL .withDescription("More than one responses received for unary or client-streaming call") .asRuntimeException(); @@ -435,7 +442,7 @@ public void onMessage(RespT message) { firstResponseReceived = true; observer.onNext(message); - if (streamingResponse && adapter.autoFlowControlEnabled) { + if (adapter.streamingResponse && adapter.autoRequestEnabled) { // Request delivery of the next inbound message. adapter.request(1); } @@ -456,12 +463,19 @@ public void onReady() { adapter.onReadyHandler.run(); } } + + @Override + void onStart() { + if (adapter.initialRequest > 0) { + adapter.request(adapter.initialRequest); + } + } } /** * Completes a {@link GrpcFuture} using {@link StreamObserver} events. */ - private static final class UnaryStreamToFuture extends ClientCall.Listener { + private static final class UnaryStreamToFuture extends StartableListener { private final GrpcFuture responseFuture; private RespT value; @@ -497,6 +511,11 @@ public void onClose(Status status, Metadata trailers) { responseFuture.setException(status.asRuntimeException(trailers)); } } + + @Override + void onStart() { + responseFuture.call.request(2); + } } private static final class GrpcFuture extends AbstractFuture { @@ -538,7 +557,7 @@ protected String pendingToString() { private static final class BlockingResponseStream implements Iterator { // Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close. private final BlockingQueue buffer = new ArrayBlockingQueue<>(2); - private final ClientCall.Listener listener = new QueuingListener(); + private final StartableListener listener = new QueuingListener(); private final ClientCall call; /** May be null. */ private final ThreadlessExecutor threadless; @@ -556,7 +575,7 @@ private static final class BlockingResponseStream implements Iterator { this.threadless = threadless; } - ClientCall.Listener listener() { + StartableListener listener() { return listener; } @@ -628,7 +647,7 @@ public void remove() { throw new UnsupportedOperationException(); } - private final class QueuingListener extends ClientCall.Listener { + private final class QueuingListener extends StartableListener { // Non private to avoid synthetic class QueuingListener() {} @@ -654,6 +673,11 @@ public void onClose(Status status, Metadata trailers) { } done = true; } + + @Override + void onStart() { + call.request(1); + } } } diff --git a/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java b/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java index 2e7ea35ebb2..5f5510f6d3c 100644 --- a/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java +++ b/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java @@ -30,8 +30,8 @@ public interface ClientResponseObserver extends StreamObserverOnly the methods {@link ClientCallStreamObserver#setOnReadyHandler(Runnable)} and - * {@link ClientCallStreamObserver#disableAutoInboundFlowControl()} may be called within this - * callback + * {@link ClientCallStreamObserver#disableAutoRequestWithInitial(int)} may be called within + * this callback * *
    *   // Copy an iterator to the request stream under flow-control
diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java
index b37bf18e8fe..13dba8ea1d5 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java
@@ -223,8 +223,8 @@ public ServerCall.Listener startCall(ServerCall call, Metadat
           new ServerCallStreamObserverImpl<>(call);
       StreamObserver requestObserver = method.invoke(responseObserver);
       responseObserver.freeze();
-      if (responseObserver.autoFlowControlEnabled) {
-        call.request(1);
+      if (responseObserver.initialRequest > 0) {
+        call.request(responseObserver.initialRequest);
       }
       return new StreamingServerCallListener(requestObserver, responseObserver, call);
     }
@@ -251,7 +251,7 @@ public void onMessage(ReqT request) {
         requestObserver.onNext(request);
 
         // Request delivery of the next inbound message.
-        if (responseObserver.autoFlowControlEnabled) {
+        if (responseObserver.autoRequestEnabled) {
           call.request(1);
         }
       }
@@ -308,7 +308,8 @@ private static final class ServerCallStreamObserverImpl
     final ServerCall call;
     volatile boolean cancelled;
     private boolean frozen;
-    private boolean autoFlowControlEnabled = true;
+    private int initialRequest = 1;
+    private boolean autoRequestEnabled = true;
     private boolean sentHeaders;
     private Runnable onReadyHandler;
     private Runnable onCancelHandler;
@@ -399,10 +400,18 @@ public void setOnCancelHandler(Runnable onCancelHandler) {
       this.onCancelHandler = onCancelHandler;
     }
 
+    @Deprecated
     @Override
     public void disableAutoInboundFlowControl() {
+      disableAutoRequestWithInitial(0);
+    }
+
+    @Override
+    public void disableAutoRequestWithInitial(int request) {
       checkState(!frozen, "Cannot disable auto flow control after initialization");
-      autoFlowControlEnabled = false;
+      Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative");
+      initialRequest = request;
+      autoRequestEnabled = false;
     }
 
     @Override
diff --git a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
index a6364877063..b9c8ca0115f 100644
--- a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
@@ -354,6 +354,48 @@ public void onCompleted() {
     assertThat(requests).containsExactly(1);
   }
 
+  @Test
+  public void disablingAutoRequestSuppressesRequests()
+      throws Exception {
+    final AtomicReference> listener =
+        new AtomicReference<>();
+    final List requests = new ArrayList<>();
+    NoopClientCall call = new NoopClientCall() {
+      @Override
+      public void start(io.grpc.ClientCall.Listener responseListener, Metadata headers) {
+        listener.set(responseListener);
+      }
+
+      @Override
+      public void request(int numMessages) {
+        requests.add(numMessages);
+      }
+    };
+    ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver() {
+      @Override
+      public void beforeStart(ClientCallStreamObserver requestStream) {
+        requestStream.disableAutoRequestWithInitial(0);
+      }
+
+      @Override
+      public void onNext(String value) {
+
+      }
+
+      @Override
+      public void onError(Throwable t) {
+
+      }
+
+      @Override
+      public void onCompleted() {
+
+      }
+    });
+    listener.get().onMessage("message");
+    assertThat(requests).isEmpty();
+  }
+
   @Test
   public void callStreamObserverPropagatesFlowControlRequestsToCall()
       throws Exception {
@@ -361,7 +403,7 @@ public void callStreamObserverPropagatesFlowControlRequestsToCall()
         new ClientResponseObserver() {
           @Override
           public void beforeStart(ClientCallStreamObserver requestStream) {
-            requestStream.disableAutoInboundFlowControl();
+            requestStream.disableAutoRequestWithInitial(0);
           }
 
           @Override
@@ -402,26 +444,34 @@ public void request(int numMessages) {
   public void canCaptureInboundFlowControlForServerStreamingObserver()
       throws Exception {
 
-    ClientResponseObserver responseObserver =
-        new ClientResponseObserver() {
-          @Override
-          public void beforeStart(ClientCallStreamObserver requestStream) {
-            requestStream.disableAutoInboundFlowControl();
-            requestStream.request(5);
-          }
+    class ResponseObserver implements ClientResponseObserver {
 
-          @Override
-          public void onNext(String value) {
-          }
+      private ClientCallStreamObserver requestStream;
 
-          @Override
-          public void onError(Throwable t) {
-          }
+      @Override
+      public void beforeStart(ClientCallStreamObserver requestStream) {
+        this.requestStream = requestStream;
+        requestStream.disableAutoRequestWithInitial(0);
+      }
 
-          @Override
-          public void onCompleted() {
-          }
-        };
+      @Override
+      public void onNext(String value) {
+      }
+
+      @Override
+      public void onError(Throwable t) {
+      }
+
+      @Override
+      public void onCompleted() {
+      }
+
+      void request(int numMessages) {
+        requestStream.request(numMessages);
+      }
+    }
+
+    ResponseObserver responseObserver = new ResponseObserver();
     final AtomicReference> listener =
         new AtomicReference<>();
     final List requests = new ArrayList<>();
@@ -437,8 +487,9 @@ public void request(int numMessages) {
       }
     };
     ClientCalls.asyncServerStreamingCall(call, 1, responseObserver);
+    responseObserver.request(5);
     listener.get().onMessage("message");
-    assertThat(requests).containsExactly(5, 1).inOrder();
+    assertThat(requests).containsExactly(5).inOrder();
   }
 
   @Test
@@ -486,7 +537,7 @@ public void onCompleted() {
         new ClientResponseObserver() {
           @Override
           public void beforeStart(final ClientCallStreamObserver requestStream) {
-            requestStream.disableAutoInboundFlowControl();
+            requestStream.disableAutoRequestWithInitial(0);
           }
 
           @Override
@@ -508,14 +559,15 @@ public void onCompleted() {
 
     CallStreamObserver integerStreamObserver = (CallStreamObserver)
         ClientCalls.asyncBidiStreamingCall(clientCall, responseObserver);
-    semaphore.acquire();
+    integerStreamObserver.request(1);
+    assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS));
     integerStreamObserver.request(2);
-    semaphore.acquire();
+    assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS));
     integerStreamObserver.request(3);
     integerStreamObserver.onCompleted();
     assertTrue(latch.await(5, TimeUnit.SECONDS));
     // Verify that number of messages produced in each onReady handler call matches the number
-    // requested by the client. Note that ClientCalls.asyncBidiStreamingCall will request(1)
+    // requested by the client.
     assertEquals(Arrays.asList(0, 1, 1, 2, 2, 2), receivedMessages);
   }
 
@@ -533,7 +585,7 @@ public void inprocessTransportOutboundFlowControl() throws Exception {
               public StreamObserver invoke(StreamObserver responseObserver) {
                 final ServerCallStreamObserver serverCallObserver =
                     (ServerCallStreamObserver) responseObserver;
-                serverCallObserver.disableAutoInboundFlowControl();
+                serverCallObserver.disableAutoRequestWithInitial(0);
                 observerFuture.set(serverCallObserver);
                 return new StreamObserver() {
                   @Override
diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
index 04f88755429..775c86717d0 100644
--- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
@@ -260,7 +260,7 @@ public void run() {
   }
 
   @Test
-  public void cannotDisableAutoFlowControlAfterServiceInvocation() throws Exception {
+  public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
     final AtomicReference> callObserver =
         new AtomicReference<>();
     ServerCallHandler callHandler =
@@ -276,7 +276,7 @@ public StreamObserver invoke(StreamObserver responseObserver)
         callHandler.startCall(serverCall, new Metadata());
     callListener.onMessage(1);
     try {
-      callObserver.get().disableAutoInboundFlowControl();
+      callObserver.get().disableAutoRequestWithInitial(0);
       fail("Cannot set onCancel handler after service invocation");
     } catch (IllegalStateException expected) {
       // Expected
@@ -307,7 +307,30 @@ public StreamObserver invoke(StreamObserver responseObserver)
   }
 
   @Test
-  public void disablingInboundAutoFlowControlForUnaryHasNoEffect() throws Exception {
+  public void disablingInboundAutoRequestSuppressesRequestsForMoreMessages() throws Exception {
+    ServerCallHandler callHandler =
+        ServerCalls.asyncBidiStreamingCall(
+            new ServerCalls.BidiStreamingMethod() {
+              @Override
+              public StreamObserver invoke(StreamObserver responseObserver) {
+                ServerCallStreamObserver serverCallObserver =
+                    (ServerCallStreamObserver) responseObserver;
+                serverCallObserver.disableAutoRequestWithInitial(0);
+                return new ServerCalls.NoopStreamObserver<>();
+              }
+            });
+    ServerCall.Listener callListener =
+        callHandler.startCall(serverCall, new Metadata());
+    callListener.onReady();
+    // Transport should not call this if nothing has been requested but forcing it here
+    // to verify that message delivery does not trigger a call to request(1).
+    callListener.onMessage(1);
+    // Should never be called
+    assertThat(serverCall.requestCalls).isEmpty();
+  }
+
+  @Test
+  public void disablingInboundAutoRequestForUnaryHasNoEffect() throws Exception {
     ServerCallHandler callHandler =
         ServerCalls.asyncUnaryCall(
             new ServerCalls.UnaryMethod() {
@@ -315,7 +338,7 @@ public void disablingInboundAutoFlowControlForUnaryHasNoEffect() throws Exceptio
               public void invoke(Integer req, StreamObserver responseObserver) {
                 ServerCallStreamObserver serverCallObserver =
                     (ServerCallStreamObserver) responseObserver;
-                serverCallObserver.disableAutoInboundFlowControl();
+                serverCallObserver.disableAutoRequestWithInitial(0);
               }
             });
     callHandler.startCall(serverCall, new Metadata());