From 87bb38aca11a85463f22615f2c42f4829269a245 Mon Sep 17 00:00:00 2001 From: DRayX <7531689+DRayX@users.noreply.github.com> Date: Wed, 4 Mar 2020 22:01:16 -0800 Subject: [PATCH 01/24] Update ClientCalls to disable initial flow-control The default behavior of requesting initial messages is applied even if disableAutoInboundFlowControl is called. ServerCalls disables all automatic flow control which is much more useful in case the user can't handle incoming messages until some time after the call has started. This change creates a new StartableListener that has an onStart method that is invoked when the call is started which makes initial requests if necessary. --- .../main/java/io/grpc/stub/ClientCalls.java | 67 ++++++++++++------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 82c370834c8..0451a994c9b 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -162,7 +162,7 @@ public static RespT blockingUnaryCall( public static Iterator blockingServerStreamingCall( ClientCall call, ReqT req) { BlockingResponseStream result = new BlockingResponseStream<>(call); - asyncUnaryRequestCall(call, req, result.listener(), true); + asyncUnaryRequestCall(call, req, result.listener()); return result; } @@ -179,7 +179,7 @@ public static Iterator blockingServerStreamingCall( ThreadlessExecutor executor = new ThreadlessExecutor(); ClientCall call = channel.newCall(method, callOptions.withExecutor(executor)); BlockingResponseStream result = new BlockingResponseStream<>(call, executor); - asyncUnaryRequestCall(call, req, result.listener(), true); + asyncUnaryRequestCall(call, req, result.listener()); return result; } @@ -193,7 +193,7 @@ public static Iterator blockingServerStreamingCall( public static ListenableFuture futureUnaryCall( ClientCall call, ReqT req) { GrpcFuture responseFuture = new GrpcFuture<>(call); - asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture), false); + asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture)); return responseFuture; } @@ -275,16 +275,14 @@ private static void asyncUnaryRequestCall( new StreamObserverToCallListenerAdapter<>( responseObserver, new CallToStreamObserverAdapter<>(call), - streamingResponse), - streamingResponse); + streamingResponse)); } private static void asyncUnaryRequestCall( ClientCall call, ReqT req, - ClientCall.Listener responseListener, - boolean streamingResponse) { - startCall(call, responseListener, streamingResponse); + StartableListener responseListener) { + startCall(call, responseListener); try { call.sendMessage(req); call.halfClose(); @@ -303,23 +301,19 @@ private static StreamObserver asyncStreamingRequestCall( startCall( call, new StreamObserverToCallListenerAdapter<>( - responseObserver, adapter, streamingResponse), - streamingResponse); + responseObserver, adapter, streamingResponse)); return adapter; } private static void startCall( ClientCall call, - ClientCall.Listener responseListener, - boolean streamingResponse) { + StartableListener responseListener) { call.start(responseListener, new Metadata()); - if (streamingResponse) { - call.request(1); - } else { - // Initially ask for two responses from flow-control so that if a misbehaving server sends - // more than one responses, we can catch it and fail it in the listener. - call.request(2); - } + responseListener.onStart(); + } + + private abstract static class StartableListener extends ClientCall.Listener { + abstract void onStart(); } private static final class CallToStreamObserverAdapter extends ClientCallStreamObserver { @@ -398,7 +392,7 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) { } private static final class StreamObserverToCallListenerAdapter - extends ClientCall.Listener { + extends StartableListener { private final StreamObserver observer; private final CallToStreamObserverAdapter adapter; private final boolean streamingResponse; @@ -456,12 +450,23 @@ public void onReady() { adapter.onReadyHandler.run(); } } + + @Override + void onStart() { + if (adapter.autoFlowControlEnabled) { + if (streamingResponse) { + adapter.request(1); + } else { + adapter.request(2); + } + } + } } /** * Completes a {@link GrpcFuture} using {@link StreamObserver} events. */ - private static final class UnaryStreamToFuture extends ClientCall.Listener { + private static final class UnaryStreamToFuture extends StartableListener { private final GrpcFuture responseFuture; private RespT value; @@ -497,6 +502,11 @@ public void onClose(Status status, Metadata trailers) { responseFuture.setException(status.asRuntimeException(trailers)); } } + + @Override + void onStart() { + responseFuture.request(2); + } } private static final class GrpcFuture extends AbstractFuture { @@ -526,6 +536,10 @@ protected boolean setException(Throwable throwable) { protected String pendingToString() { return MoreObjects.toStringHelper(this).add("clientCall", call).toString(); } + + void request(int numMessages) { + call.request(numMessages); + } } /** @@ -538,7 +552,7 @@ protected String pendingToString() { private static final class BlockingResponseStream implements Iterator { // Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close. private final BlockingQueue buffer = new ArrayBlockingQueue<>(2); - private final ClientCall.Listener listener = new QueuingListener(); + private final StartableListener listener = new QueuingListener(); private final ClientCall call; /** May be null. */ private final ThreadlessExecutor threadless; @@ -556,7 +570,7 @@ private static final class BlockingResponseStream implements Iterator { this.threadless = threadless; } - ClientCall.Listener listener() { + StartableListener listener() { return listener; } @@ -628,7 +642,7 @@ public void remove() { throw new UnsupportedOperationException(); } - private final class QueuingListener extends ClientCall.Listener { + private final class QueuingListener extends StartableListener { // Non private to avoid synthetic class QueuingListener() {} @@ -654,6 +668,11 @@ public void onClose(Status status, Metadata trailers) { } done = true; } + + @Override + void onStart() { + call.request(1); + } } } From 86752b0ca77cd713bff117fdce1d364abd75003b Mon Sep 17 00:00:00 2001 From: DRayX <7531689+DRayX@users.noreply.github.com> Date: Wed, 4 Mar 2020 22:10:29 -0800 Subject: [PATCH 02/24] Update stub/src/main/java/io/grpc/stub/ClientCalls.java Delete accidental addtion --- stub/src/main/java/io/grpc/stub/ClientCalls.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 0451a994c9b..ef487357ebf 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -466,7 +466,7 @@ void onStart() { /** * Completes a {@link GrpcFuture} using {@link StreamObserver} events. */ - private static final class UnaryStreamToFuture extends StartableListener { + private static final class UnaryStreamToFuture extends StartableListener { private final GrpcFuture responseFuture; private RespT value; From 7029ad70bb69bcdbd85d5685238f2a322510d5ec Mon Sep 17 00:00:00 2001 From: DRayX <7531689+DRayX@users.noreply.github.com> Date: Mon, 9 Mar 2020 14:39:47 -0700 Subject: [PATCH 03/24] Add new disableAutoRequest method Add a new disableAutoRequest method that disables all automatic requests while disableAutoInboundFlowControl maintains existing behavior. disableAutoInboundFlowControl is also marked as Deprecated. --- .../java/io/grpc/stub/CallStreamObserver.java | 24 +++++++++++++++++++ .../main/java/io/grpc/stub/ClientCalls.java | 20 ++++++++++++---- .../main/java/io/grpc/stub/ServerCalls.java | 13 ++++++---- 3 files changed, 49 insertions(+), 8 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java index 98fa6fba57e..07cf5769123 100644 --- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java @@ -106,9 +106,33 @@ public abstract class CallStreamObserver implements StreamObserver { * * *

+ * + * @deprecated Use {@link #disableAutoRequest} instead. This method will be removed. */ + @Deprecated public abstract void disableAutoInboundFlowControl(); + /** + * Disables automatic flow control where initial tokens are requested when the call is started, + * and a token is returned to the peer after a call to the 'inbound' {@link + * io.grpc.stub.StreamObserver#onNext(Object)} has completed. If disabled an application must + * make explicit calls to {@link #request} to receive any messages. + * + *

On client-side this method may only be called during {@link + * ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial + * call to the application, before the service returns its {@code StreamObserver}. + * + *

Note that for server-side cases where the message is recieved before the handler is invoked, + * this method will have no effect. This is true for: + * + *

    + *
  • {@link io.grpc.MethodDescriptor.MethodType#UNARY} operations.
  • + *
  • {@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.
  • + *
+ *

+ */ + public abstract void disableAutoRequest(); + /** * Requests the peer to produce {@code count} more messages to be delivered to the 'inbound' * {@link StreamObserver}. diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index ef487357ebf..0c6b85bbf31 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -316,11 +316,17 @@ private abstract static class StartableListener extends ClientCall.Listener extends ClientCallStreamObserver { private boolean frozen; private final ClientCall call; private Runnable onReadyHandler; - private boolean autoFlowControlEnabled = true; + private AutoRequestMode autoRequestMode = AutoRequestMode.INITIAL_AND_NEXT; private boolean aborted = false; private boolean completed = false; @@ -372,7 +378,13 @@ public void disableAutoInboundFlowControl() { throw new IllegalStateException( "Cannot disable auto flow control after call started. Use ClientResponseObserver"); } - autoFlowControlEnabled = false; + autoRequestMode = AutoRequestMode.INITIAL_ONLY; + } + + @Override + public void disableAutoRequest() { + checkState(!frozen, "Cannot disable auto flow control after call started. Use ClientResponseObserver"); + autoRequestMode = AutoRequestMode.DISABLED; } @Override @@ -429,7 +441,7 @@ public void onMessage(RespT message) { firstResponseReceived = true; observer.onNext(message); - if (streamingResponse && adapter.autoFlowControlEnabled) { + if (streamingResponse && adapter.autoRequestMode == AutoRequestMode.INITIAL_AND_NEXT) { // Request delivery of the next inbound message. adapter.request(1); } @@ -453,7 +465,7 @@ public void onReady() { @Override void onStart() { - if (adapter.autoFlowControlEnabled) { + if (adapter.autoRequestMode != AutoRequestMode.DISABLED) { if (streamingResponse) { adapter.request(1); } else { diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index b37bf18e8fe..7c58d368847 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -223,7 +223,7 @@ public ServerCall.Listener startCall(ServerCall call, Metadat new ServerCallStreamObserverImpl<>(call); StreamObserver requestObserver = method.invoke(responseObserver); responseObserver.freeze(); - if (responseObserver.autoFlowControlEnabled) { + if (responseObserver.autoRequestEnabled) { call.request(1); } return new StreamingServerCallListener(requestObserver, responseObserver, call); @@ -251,7 +251,7 @@ public void onMessage(ReqT request) { requestObserver.onNext(request); // Request delivery of the next inbound message. - if (responseObserver.autoFlowControlEnabled) { + if (responseObserver.autoRequestEnabled) { call.request(1); } } @@ -308,7 +308,7 @@ private static final class ServerCallStreamObserverImpl final ServerCall call; volatile boolean cancelled; private boolean frozen; - private boolean autoFlowControlEnabled = true; + private boolean autoRequestEnabled = true; private boolean sentHeaders; private Runnable onReadyHandler; private Runnable onCancelHandler; @@ -401,8 +401,13 @@ public void setOnCancelHandler(Runnable onCancelHandler) { @Override public void disableAutoInboundFlowControl() { + disableAutoRequest(); + } + + @Override + public void disableAutoRequest() { checkState(!frozen, "Cannot disable auto flow control after initialization"); - autoFlowControlEnabled = false; + autoRequestEnabled = false; } @Override From 4fd1176ab01b98ba00b2fe98ee7a391a379dec2e Mon Sep 17 00:00:00 2001 From: DRayX <7531689+DRayX@users.noreply.github.com> Date: Mon, 9 Mar 2020 14:56:42 -0700 Subject: [PATCH 04/24] Add @Deprecated annotation to overrides --- stub/src/main/java/io/grpc/stub/ClientCalls.java | 1 + stub/src/main/java/io/grpc/stub/ServerCalls.java | 1 + 2 files changed, 2 insertions(+) diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 0c6b85bbf31..62ef3d492da 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -372,6 +372,7 @@ public void setOnReadyHandler(Runnable onReadyHandler) { this.onReadyHandler = onReadyHandler; } + @Deprecated @Override public void disableAutoInboundFlowControl() { if (frozen) { diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 7c58d368847..8e5ed1e1068 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -399,6 +399,7 @@ public void setOnCancelHandler(Runnable onCancelHandler) { this.onCancelHandler = onCancelHandler; } + @Deprecated @Override public void disableAutoInboundFlowControl() { disableAutoRequest(); From 2a036471fa21bac8d8de48364aee6483a9f99556 Mon Sep 17 00:00:00 2001 From: DRayX <7531689+DRayX@users.noreply.github.com> Date: Mon, 9 Mar 2020 15:43:07 -0700 Subject: [PATCH 05/24] Update ClientResponseObserver comment --- stub/src/main/java/io/grpc/stub/ClientResponseObserver.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java b/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java index 2e7ea35ebb2..e5557c51fbf 100644 --- a/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java +++ b/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java @@ -30,8 +30,7 @@ public interface ClientResponseObserver extends StreamObserverOnly the methods {@link ClientCallStreamObserver#setOnReadyHandler(Runnable)} and - * {@link ClientCallStreamObserver#disableAutoInboundFlowControl()} may be called within this - * callback + * {@link ClientCallStreamObserver#disableAutoRequest()} may be called within this callback * *
    *   // Copy an iterator to the request stream under flow-control

From 07bb0826830d93e681247db9f4ff1940bd45e3d6 Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Mon, 9 Mar 2020 15:47:49 -0700
Subject: [PATCH 06/24] Update Manual Flow Control examples

Update the manual flow control examples to use the new disableAutoRequest method.  Note that the ManualFlowControlClient now needs to request one message after the call has been started.
---
 .../examples/manualflowcontrol/ManualFlowControlClient.java | 6 ++++--
 .../examples/manualflowcontrol/ManualFlowControlServer.java | 2 +-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java
index f442de6d527..3def96f136a 100644
--- a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java
+++ b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java
@@ -54,7 +54,7 @@ public void beforeStart(final ClientCallStreamObserver requestStre
             this.requestStream = requestStream;
             // Set up manual flow control for the response stream. It feels backwards to configure the response
             // stream's flow control using the request stream's observer, but this is the way it is.
-            requestStream.disableAutoInboundFlowControl();
+            requestStream.disableAutoRequest();
 
             // Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked
             // when the consuming side has enough buffer space to receive more messages.
@@ -111,7 +111,9 @@ public void onCompleted() {
         };
 
     // Note: clientResponseObserver is handling both request and response stream processing.
-    stub.sayHelloStreaming(clientResponseObserver);
+    ClientCallStreamObserver requestObserver =
+        (ClientCallStreamObserver) stub.sayHelloStreaming(clientResponseObserver);
+    requestObserver.request(1);
 
     done.await();
 
diff --git a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java
index 4ad59308437..de8142596ea 100644
--- a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java
+++ b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java
@@ -39,7 +39,7 @@ public StreamObserver sayHelloStreaming(final StreamObserver serverCallStreamObserver =
             (ServerCallStreamObserver) responseObserver;
-        serverCallStreamObserver.disableAutoInboundFlowControl();
+        serverCallStreamObserver.disableAutoRequest();
 
         // Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked
         // when the consuming side has enough buffer space to receive more messages.

From 1e8181a286ef78047ed40496677afebed5957609 Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Mon, 9 Mar 2020 15:55:35 -0700
Subject: [PATCH 07/24] Update ProtoReflectionService to use disableAutoRequest

---
 .../io/grpc/protobuf/services/ProtoReflectionService.java  | 2 +-
 .../grpc/protobuf/services/ProtoReflectionServiceTest.java | 7 +++++--
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java b/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java
index beadb0f1eba..3ec30a1df34 100644
--- a/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java
+++ b/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java
@@ -135,7 +135,7 @@ public StreamObserver serverReflectionInfo(
     ProtoReflectionStreamObserver requestObserver =
         new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver);
     serverCallStreamObserver.setOnReadyHandler(requestObserver);
-    serverCallStreamObserver.disableAutoInboundFlowControl();
+    serverCallStreamObserver.disableAutoRequest();
     serverCallStreamObserver.request(1);
     return requestObserver;
   }
diff --git a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
index 00cc42ca3bd..b75225ab434 100644
--- a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
+++ b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
@@ -532,8 +532,11 @@ public void flowControl() throws Exception {
         (ClientCallStreamObserver)
             stub.serverReflectionInfo(clientResponseObserver);
 
-    // ClientCalls.startCall() calls request(1) initially, so we should get an immediate response.
+    // Verify we don't receive a response until we request it.    requestObserver.onNext(flowControlRequest);
     requestObserver.onNext(flowControlRequest);
+    assertEquals(0, clientResponseObserver.getResponses().size());
+
+    requestObserver.request(1);
     assertEquals(1, clientResponseObserver.getResponses().size());
     assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
 
@@ -594,7 +597,7 @@ private static class FlowControlClientResponseObserver
 
     @Override
     public void beforeStart(final ClientCallStreamObserver requestStream) {
-      requestStream.disableAutoInboundFlowControl();
+      requestStream.disableAutoRequest();
     }
 
     @Override

From 24d6763e50b9042bc7fcd3842c21f7c7743c86db Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Mon, 9 Mar 2020 15:56:51 -0700
Subject: [PATCH 08/24] Fixed mistake in comment

---
 .../io/grpc/protobuf/services/ProtoReflectionServiceTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
index b75225ab434..d81c54a14b3 100644
--- a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
+++ b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
@@ -532,7 +532,7 @@ public void flowControl() throws Exception {
         (ClientCallStreamObserver)
             stub.serverReflectionInfo(clientResponseObserver);
 
-    // Verify we don't receive a response until we request it.    requestObserver.onNext(flowControlRequest);
+    // Verify we don't receive a response until we request it.
     requestObserver.onNext(flowControlRequest);
     assertEquals(0, clientResponseObserver.getResponses().size());
 

From c7083ae25357adbb05df2f0da19c0735712b1b7b Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Mon, 9 Mar 2020 16:26:40 -0700
Subject: [PATCH 09/24] Fix flowControlOnCompleteWithPendingRequest test

---
 .../protobuf/services/ProtoReflectionServiceTest.java     | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
index d81c54a14b3..c8b1e0b7ad1 100644
--- a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
+++ b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java
@@ -560,17 +560,15 @@ public void flowControlOnCompleteWithPendingRequest() throws Exception {
         (ClientCallStreamObserver)
             stub.serverReflectionInfo(clientResponseObserver);
 
-    // ClientCalls.startCall() calls request(1) initially, so make additional request.
-    requestObserver.onNext(flowControlRequest);
     requestObserver.onNext(flowControlRequest);
     requestObserver.onCompleted();
-    assertEquals(1, clientResponseObserver.getResponses().size());
+    assertEquals(0, clientResponseObserver.getResponses().size());
     assertFalse(clientResponseObserver.onCompleteCalled());
 
     requestObserver.request(1);
     assertTrue(clientResponseObserver.onCompleteCalled());
-    assertEquals(2, clientResponseObserver.getResponses().size());
-    assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
+    assertEquals(1, clientResponseObserver.getResponses().size());
+    assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
   }
 
   private final ServerReflectionRequest flowControlRequest =

From 2014d062a92b3f82d21cf78f4eee67553806b612 Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Mon, 9 Mar 2020 17:10:17 -0700
Subject: [PATCH 10/24] Update ManualFlowControlClient example

Update the manual flow control client example to use a pattern that is applicable in all call arities (unary or streaming request).  If the call is unary request, the StreamObserver isn't returned, so the initial request can't be started from the return value, but the initial request can't be made before the call is started, so we expose a custom "request" method on the ClinetResponseObserver implementation.
---
 .../ManualFlowControlClient.java              | 131 +++++++++---------
 1 file changed, 67 insertions(+), 64 deletions(-)

diff --git a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java
index 3def96f136a..44d33d6e5d5 100644
--- a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java
+++ b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java
@@ -44,76 +44,79 @@ public static void main(String[] args) throws InterruptedException {
 
     // When using manual flow-control and back-pressure on the client, the ClientResponseObserver handles both
     // request and response streams.
-    ClientResponseObserver clientResponseObserver =
-        new ClientResponseObserver() {
-
-          ClientCallStreamObserver requestStream;
+    class FlowControlResponseObserver implements ClientResponseObserver {
+
+      ClientCallStreamObserver requestStream;
+
+      @Override
+      public void beforeStart(final ClientCallStreamObserver requestStream) {
+        this.requestStream = requestStream;
+        // Set up manual flow control for the response stream. It feels backwards to configure the response
+        // stream's flow control using the request stream's observer, but this is the way it is.
+        requestStream.disableAutoRequest();
+
+        // Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked
+        // when the consuming side has enough buffer space to receive more messages.
+        //
+        // Messages are serialized into a transport-specific transmit buffer. Depending on the size of this buffer,
+        // MANY messages may be buffered, however, they haven't yet been sent to the server. The server must call
+        // request() to pull a buffered message from the client.
+        //
+        // Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming
+        // StreamObserver's onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent
+        // additional messages from being processed by the incoming StreamObserver. The onReadyHandler must return
+        // in a timely manner or else message processing throughput will suffer.
+        requestStream.setOnReadyHandler(new Runnable() {
+          // An iterator is used so we can pause and resume iteration of the request data.
+          Iterator iterator = names().iterator();
 
           @Override
-          public void beforeStart(final ClientCallStreamObserver requestStream) {
-            this.requestStream = requestStream;
-            // Set up manual flow control for the response stream. It feels backwards to configure the response
-            // stream's flow control using the request stream's observer, but this is the way it is.
-            requestStream.disableAutoRequest();
-
-            // Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked
-            // when the consuming side has enough buffer space to receive more messages.
-            //
-            // Messages are serialized into a transport-specific transmit buffer. Depending on the size of this buffer,
-            // MANY messages may be buffered, however, they haven't yet been sent to the server. The server must call
-            // request() to pull a buffered message from the client.
-            //
-            // Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming
-            // StreamObserver's onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent
-            // additional messages from being processed by the incoming StreamObserver. The onReadyHandler must return
-            // in a timely manner or else message processing throughput will suffer.
-            requestStream.setOnReadyHandler(new Runnable() {
-              // An iterator is used so we can pause and resume iteration of the request data.
-              Iterator iterator = names().iterator();
-
-              @Override
-              public void run() {
-                // Start generating values from where we left off on a non-gRPC thread.
-                while (requestStream.isReady()) {
-                  if (iterator.hasNext()) {
-                      // Send more messages if there are more messages to send.
-                      String name = iterator.next();
-                      logger.info("--> " + name);
-                      HelloRequest request = HelloRequest.newBuilder().setName(name).build();
-                      requestStream.onNext(request);
-                  } else {
-                      // Signal completion if there is nothing left to send.
-                      requestStream.onCompleted();
-                  }
-                }
+          public void run() {
+            // Start generating values from where we left off on a non-gRPC thread.
+            while (requestStream.isReady()) {
+              if (iterator.hasNext()) {
+                // Send more messages if there are more messages to send.
+                String name = iterator.next();
+                logger.info("--> " + name);
+                HelloRequest request = HelloRequest.newBuilder().setName(name).build();
+                requestStream.onNext(request);
+              } else {
+                // Signal completion if there is nothing left to send.
+                requestStream.onCompleted();
               }
-            });
-          }
-
-          @Override
-          public void onNext(HelloReply value) {
-            logger.info("<-- " + value.getMessage());
-            // Signal the sender to send one message.
-            requestStream.request(1);
-          }
-
-          @Override
-          public void onError(Throwable t) {
-            t.printStackTrace();
-            done.countDown();
-          }
-
-          @Override
-          public void onCompleted() {
-            logger.info("All Done");
-            done.countDown();
+            }
           }
-        };
+        });
+      }
+
+      @Override
+      public void onNext(HelloReply value) {
+        logger.info("<-- " + value.getMessage());
+        // Signal the sender to send one message.
+        requestStream.request(1);
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        t.printStackTrace();
+        done.countDown();
+      }
+
+      @Override
+      public void onCompleted() {
+        logger.info("All Done");
+        done.countDown();
+      }
+
+      void request(int numMessages) {
+        requestStream.request(numMessages);
+      }
+    }
+    FlowControlResponseObserver responseObserver = new FlowControlResponseObserver();
 
     // Note: clientResponseObserver is handling both request and response stream processing.
-    ClientCallStreamObserver requestObserver =
-        (ClientCallStreamObserver) stub.sayHelloStreaming(clientResponseObserver);
-    requestObserver.request(1);
+    stub.sayHelloStreaming(responseObserver);
+    responseObserver.request(1);
 
     done.await();
 

From 4f90e0d9f23097d1681235115725bc5cb334d53f Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Mon, 9 Mar 2020 17:16:29 -0700
Subject: [PATCH 11/24] Update {Client|Server}CallsTest

---
 .../java/io/grpc/stub/ClientCallsTest.java    | 101 +++++++++++++-----
 .../java/io/grpc/stub/ServerCallsTest.java    |  32 +++++-
 2 files changed, 105 insertions(+), 28 deletions(-)

diff --git a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
index a6364877063..5d9f363e0f0 100644
--- a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
@@ -331,6 +331,7 @@ public void request(int numMessages) {
     };
     ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver() {
       @Override
+      @SuppressWarnings("deprecation")
       public void beforeStart(ClientCallStreamObserver requestStream) {
         requestStream.disableAutoInboundFlowControl();
       }
@@ -354,6 +355,48 @@ public void onCompleted() {
     assertThat(requests).containsExactly(1);
   }
 
+  @Test
+  public void disablingAutoRequestSuppressesRequests()
+      throws Exception {
+    final AtomicReference> listener =
+        new AtomicReference<>();
+    final List requests = new ArrayList<>();
+    NoopClientCall call = new NoopClientCall() {
+      @Override
+      public void start(io.grpc.ClientCall.Listener responseListener, Metadata headers) {
+        listener.set(responseListener);
+      }
+
+      @Override
+      public void request(int numMessages) {
+        requests.add(numMessages);
+      }
+    };
+    ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver() {
+      @Override
+      public void beforeStart(ClientCallStreamObserver requestStream) {
+        requestStream.disableAutoRequest();
+      }
+
+      @Override
+      public void onNext(String value) {
+
+      }
+
+      @Override
+      public void onError(Throwable t) {
+
+      }
+
+      @Override
+      public void onCompleted() {
+
+      }
+    });
+    listener.get().onMessage("message");
+    assertThat(requests).isEmpty();
+  }
+
   @Test
   public void callStreamObserverPropagatesFlowControlRequestsToCall()
       throws Exception {
@@ -361,7 +404,7 @@ public void callStreamObserverPropagatesFlowControlRequestsToCall()
         new ClientResponseObserver() {
           @Override
           public void beforeStart(ClientCallStreamObserver requestStream) {
-            requestStream.disableAutoInboundFlowControl();
+            requestStream.disableAutoRequest();
           }
 
           @Override
@@ -402,26 +445,34 @@ public void request(int numMessages) {
   public void canCaptureInboundFlowControlForServerStreamingObserver()
       throws Exception {
 
-    ClientResponseObserver responseObserver =
-        new ClientResponseObserver() {
-          @Override
-          public void beforeStart(ClientCallStreamObserver requestStream) {
-            requestStream.disableAutoInboundFlowControl();
-            requestStream.request(5);
-          }
+    class ResponseObserver implements ClientResponseObserver {
 
-          @Override
-          public void onNext(String value) {
-          }
+      private ClientCallStreamObserver requestStream;
 
-          @Override
-          public void onError(Throwable t) {
-          }
+      @Override
+      public void beforeStart(ClientCallStreamObserver requestStream) {
+        this.requestStream = requestStream;
+        requestStream.disableAutoRequest();
+      }
 
-          @Override
-          public void onCompleted() {
-          }
-        };
+      @Override
+      public void onNext(String value) {
+      }
+
+      @Override
+      public void onError(Throwable t) {
+      }
+
+      @Override
+      public void onCompleted() {
+      }
+
+      void request(int numMessages) {
+        requestStream.request(numMessages);
+      }
+    }
+
+    ResponseObserver responseObserver = new ResponseObserver();
     final AtomicReference> listener =
         new AtomicReference<>();
     final List requests = new ArrayList<>();
@@ -437,8 +488,9 @@ public void request(int numMessages) {
       }
     };
     ClientCalls.asyncServerStreamingCall(call, 1, responseObserver);
+    responseObserver.request(5);
     listener.get().onMessage("message");
-    assertThat(requests).containsExactly(5, 1).inOrder();
+    assertThat(requests).containsExactly(5).inOrder();
   }
 
   @Test
@@ -486,7 +538,7 @@ public void onCompleted() {
         new ClientResponseObserver() {
           @Override
           public void beforeStart(final ClientCallStreamObserver requestStream) {
-            requestStream.disableAutoInboundFlowControl();
+            requestStream.disableAutoRequest();
           }
 
           @Override
@@ -508,14 +560,15 @@ public void onCompleted() {
 
     CallStreamObserver integerStreamObserver = (CallStreamObserver)
         ClientCalls.asyncBidiStreamingCall(clientCall, responseObserver);
-    semaphore.acquire();
+    integerStreamObserver.request(1);
+    assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS));
     integerStreamObserver.request(2);
-    semaphore.acquire();
+    assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS));
     integerStreamObserver.request(3);
     integerStreamObserver.onCompleted();
     assertTrue(latch.await(5, TimeUnit.SECONDS));
     // Verify that number of messages produced in each onReady handler call matches the number
-    // requested by the client. Note that ClientCalls.asyncBidiStreamingCall will request(1)
+    // requested by the client.
     assertEquals(Arrays.asList(0, 1, 1, 2, 2, 2), receivedMessages);
   }
 
@@ -533,7 +586,7 @@ public void inprocessTransportOutboundFlowControl() throws Exception {
               public StreamObserver invoke(StreamObserver responseObserver) {
                 final ServerCallStreamObserver serverCallObserver =
                     (ServerCallStreamObserver) responseObserver;
-                serverCallObserver.disableAutoInboundFlowControl();
+                serverCallObserver.disableAutoRequest();
                 observerFuture.set(serverCallObserver);
                 return new StreamObserver() {
                   @Override
diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
index 04f88755429..93793ddfdd9 100644
--- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
@@ -260,7 +260,7 @@ public void run() {
   }
 
   @Test
-  public void cannotDisableAutoFlowControlAfterServiceInvocation() throws Exception {
+  public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
     final AtomicReference> callObserver =
         new AtomicReference<>();
     ServerCallHandler callHandler =
@@ -276,7 +276,7 @@ public StreamObserver invoke(StreamObserver responseObserver)
         callHandler.startCall(serverCall, new Metadata());
     callListener.onMessage(1);
     try {
-      callObserver.get().disableAutoInboundFlowControl();
+      callObserver.get().disableAutoRequest();
       fail("Cannot set onCancel handler after service invocation");
     } catch (IllegalStateException expected) {
       // Expected
@@ -289,6 +289,7 @@ public void disablingInboundAutoFlowControlSuppressesRequestsForMoreMessages() t
         ServerCalls.asyncBidiStreamingCall(
             new ServerCalls.BidiStreamingMethod() {
               @Override
+              @SuppressWarnings("deprecation")
               public StreamObserver invoke(StreamObserver responseObserver) {
                 ServerCallStreamObserver serverCallObserver =
                     (ServerCallStreamObserver) responseObserver;
@@ -307,7 +308,30 @@ public StreamObserver invoke(StreamObserver responseObserver)
   }
 
   @Test
-  public void disablingInboundAutoFlowControlForUnaryHasNoEffect() throws Exception {
+  public void disablingInboundAutoRequestSuppressesRequestsForMoreMessages() throws Exception {
+    ServerCallHandler callHandler =
+        ServerCalls.asyncBidiStreamingCall(
+            new ServerCalls.BidiStreamingMethod() {
+              @Override
+              public StreamObserver invoke(StreamObserver responseObserver) {
+                ServerCallStreamObserver serverCallObserver =
+                    (ServerCallStreamObserver) responseObserver;
+                serverCallObserver.disableAutoRequest();
+                return new ServerCalls.NoopStreamObserver<>();
+              }
+            });
+    ServerCall.Listener callListener =
+        callHandler.startCall(serverCall, new Metadata());
+    callListener.onReady();
+    // Transport should not call this if nothing has been requested but forcing it here
+    // to verify that message delivery does not trigger a call to request(1).
+    callListener.onMessage(1);
+    // Should never be called
+    assertThat(serverCall.requestCalls).isEmpty();
+  }
+
+  @Test
+  public void disablingInboundAutoRequestForUnaryHasNoEffect() throws Exception {
     ServerCallHandler callHandler =
         ServerCalls.asyncUnaryCall(
             new ServerCalls.UnaryMethod() {
@@ -315,7 +339,7 @@ public void disablingInboundAutoFlowControlForUnaryHasNoEffect() throws Exceptio
               public void invoke(Integer req, StreamObserver responseObserver) {
                 ServerCallStreamObserver serverCallObserver =
                     (ServerCallStreamObserver) responseObserver;
-                serverCallObserver.disableAutoInboundFlowControl();
+                serverCallObserver.disableAutoRequest();
               }
             });
     callHandler.startCall(serverCall, new Metadata());

From 80fc740142cf2d7c148b5f7b9183b0de6cb2945b Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Mon, 9 Mar 2020 17:35:45 -0700
Subject: [PATCH 12/24] Update ClientCalls.java

---
 stub/src/main/java/io/grpc/stub/ClientCalls.java | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java
index 62ef3d492da..49a787a3363 100644
--- a/stub/src/main/java/io/grpc/stub/ClientCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java
@@ -384,7 +384,10 @@ public void disableAutoInboundFlowControl() {
 
     @Override
     public void disableAutoRequest() {
-      checkState(!frozen, "Cannot disable auto flow control after call started. Use ClientResponseObserver");
+      if (frozen) {
+        throw new IllegalStateException(
+            "Cannot disable auto flow control after call started. Use ClientResponseObserver");
+      }
       autoRequestMode = AutoRequestMode.DISABLED;
     }
 

From c149edc24c0ea6a3b785348d8cc62699ac477509 Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Tue, 10 Mar 2020 15:53:22 -0700
Subject: [PATCH 13/24] Update CallStreamObserver.java

---
 stub/src/main/java/io/grpc/stub/CallStreamObserver.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
index 07cf5769123..cfae34811eb 100644
--- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
@@ -113,8 +113,8 @@ public abstract class CallStreamObserver implements StreamObserver {
   public abstract void disableAutoInboundFlowControl();
 
   /**
-   * Disables automatic flow control where initial tokens are requested when the call is started,
-   * and a token is returned to the peer after a call to the 'inbound' {@link
+   * Disables automatic flow control where initial messages are requested when the call is started,
+   * and an additional message is requested to be read after a call to the 'inbound' {@link
    * io.grpc.stub.StreamObserver#onNext(Object)} has completed. If disabled an application must
    * make explicit calls to {@link #request} to receive any messages.
    *
@@ -122,7 +122,7 @@ public abstract class CallStreamObserver implements StreamObserver {
    * ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
    * call to the application, before the service returns its {@code StreamObserver}.
    *
-   * 

Note that for server-side cases where the message is recieved before the handler is invoked, + *

Note that for server-side cases where the message is received before the handler is invoked, * this method will have no effect. This is true for: * *

    From effb394975d225f989e7d76e6496e7a6648ad91e Mon Sep 17 00:00:00 2001 From: DRayX <7531689+DRayX@users.noreply.github.com> Date: Tue, 10 Mar 2020 16:08:50 -0700 Subject: [PATCH 14/24] Add a note about migrating from disableAutoInboundFlowControl to disableAutoRequest --- .../java/io/grpc/stub/CallStreamObserver.java | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java index cfae34811eb..c20f372dcc2 100644 --- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java @@ -106,8 +106,33 @@ public abstract class CallStreamObserver implements StreamObserver { * *
*

- * - * @deprecated Use {@link #disableAutoRequest} instead. This method will be removed. + * + *

Migrating to {@link #disableAutoRequest} requires making adding an initial {@link #request} + * after the call has started. For example: + * + *

{@code
+   * Rectangle request = ...;
+   * class ResponseObserver implements ClientResponseObserver {
+   *   private ClientCallStreamObserver requestObserver;
+   *   ...
+   *   @Override public void beforeStart(ClientCallStreamObserver requestObserver) {
+   *     this.requestObserver = requestObserver;
+   *     requestObserver.disableAutoRequest();
+   *     // Note that requestObserver.request can not be called before the call is started
+   *     ...
+   *   }
+   *   // Need to expose the request method outside the Observer
+   *   void request(int numMessages) {
+   *     requestObserver.request(numMessages);
+   *   }
+   * }
+   * ResponseObserver responseObserver = new ResponseObserver();
+   * asyncStub.listFeatures(request, responseObserver);
+   * responseObserver.request(1);
+   * }
+ * + * @deprecated Use {@link #disableAutoRequest} instead (see note above about migrating). This + * method will be removed. */ @Deprecated public abstract void disableAutoInboundFlowControl(); From b88f02fa4db5cc5e92c7f0b30501249b4598e3e9 Mon Sep 17 00:00:00 2001 From: DRayX <7531689+DRayX@users.noreply.github.com> Date: Tue, 10 Mar 2020 16:12:47 -0700 Subject: [PATCH 15/24] Update CallStreamObserver.java Noted the difference between client and server migration --- stub/src/main/java/io/grpc/stub/CallStreamObserver.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java index c20f372dcc2..67073db57c6 100644 --- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java @@ -107,8 +107,9 @@ public abstract class CallStreamObserver implements StreamObserver { * *

* - *

Migrating to {@link #disableAutoRequest} requires making adding an initial {@link #request} - * after the call has started. For example: + *

On the server side, migrating to {@link #disableAutoRequest} requires no changes; they have + * identical behavior. On the client-side, migrating to {@code disableAutoRequest} requires + * making adding an initial {@link #request} after the call has started. For example: * *

{@code
    * Rectangle request = ...;

From 4285b05b8b60fcec4cd50e46b7096e70011bacf9 Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Tue, 10 Mar 2020 17:26:23 -0700
Subject: [PATCH 16/24] Update CallStreamObserver.java

Fix indentation
---
 stub/src/main/java/io/grpc/stub/CallStreamObserver.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
index 67073db57c6..e8a52db00c7 100644
--- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
@@ -133,7 +133,7 @@ public abstract class CallStreamObserver implements StreamObserver {
    * }
* * @deprecated Use {@link #disableAutoRequest} instead (see note above about migrating). This - * method will be removed. + * method will be removed. */ @Deprecated public abstract void disableAutoInboundFlowControl(); From d2b25e625b47892d466706b1f09cdbb53109ba3d Mon Sep 17 00:00:00 2001 From: DRayX <7531689+DRayX@users.noreply.github.com> Date: Fri, 3 Apr 2020 17:54:22 -0700 Subject: [PATCH 17/24] Switch to disableAutoRequestWithInitial Switch to a disableAutoRequestWithInitialMethod that allows for an initial number of requests to be specified. Also, change the behavior of ClientCalls.CallToStreamObserverAdapter.request to request 2 if expecting unary respone. --- .../ManualFlowControlClient.java | 129 +++++++++--------- .../ManualFlowControlServer.java | 2 +- .../services/ProtoReflectionService.java | 3 +- .../services/ProtoReflectionServiceTest.java | 2 +- .../java/io/grpc/stub/CallStreamObserver.java | 38 ++---- .../main/java/io/grpc/stub/ClientCalls.java | 63 ++++----- .../io/grpc/stub/ClientResponseObserver.java | 3 +- .../main/java/io/grpc/stub/ServerCalls.java | 11 +- .../java/io/grpc/stub/ClientCallsTest.java | 10 +- .../java/io/grpc/stub/ServerCallsTest.java | 6 +- 10 files changed, 117 insertions(+), 150 deletions(-) diff --git a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java index 44d33d6e5d5..324392db150 100644 --- a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java +++ b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java @@ -44,79 +44,74 @@ public static void main(String[] args) throws InterruptedException { // When using manual flow-control and back-pressure on the client, the ClientResponseObserver handles both // request and response streams. - class FlowControlResponseObserver implements ClientResponseObserver { - - ClientCallStreamObserver requestStream; - - @Override - public void beforeStart(final ClientCallStreamObserver requestStream) { - this.requestStream = requestStream; - // Set up manual flow control for the response stream. It feels backwards to configure the response - // stream's flow control using the request stream's observer, but this is the way it is. - requestStream.disableAutoRequest(); - - // Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked - // when the consuming side has enough buffer space to receive more messages. - // - // Messages are serialized into a transport-specific transmit buffer. Depending on the size of this buffer, - // MANY messages may be buffered, however, they haven't yet been sent to the server. The server must call - // request() to pull a buffered message from the client. - // - // Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming - // StreamObserver's onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent - // additional messages from being processed by the incoming StreamObserver. The onReadyHandler must return - // in a timely manner or else message processing throughput will suffer. - requestStream.setOnReadyHandler(new Runnable() { - // An iterator is used so we can pause and resume iteration of the request data. - Iterator iterator = names().iterator(); + ClientResponseObserver clientResponseObserver = + new ClientResponseObserver() { + + ClientCallStreamObserver requestStream; @Override - public void run() { - // Start generating values from where we left off on a non-gRPC thread. - while (requestStream.isReady()) { - if (iterator.hasNext()) { - // Send more messages if there are more messages to send. - String name = iterator.next(); - logger.info("--> " + name); - HelloRequest request = HelloRequest.newBuilder().setName(name).build(); - requestStream.onNext(request); - } else { - // Signal completion if there is nothing left to send. - requestStream.onCompleted(); + public void beforeStart(final ClientCallStreamObserver requestStream) { + this.requestStream = requestStream; + // Set up manual flow control for the response stream. It feels backwards to configure the response + // stream's flow control using the request stream's observer, but this is the way it is. + requestStream.disableAutoRequestWithInitial(1); + + // Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked + // when the consuming side has enough buffer space to receive more messages. + // + // Messages are serialized into a transport-specific transmit buffer. Depending on the size of this buffer, + // MANY messages may be buffered, however, they haven't yet been sent to the server. The server must call + // request() to pull a buffered message from the client. + // + // Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming + // StreamObserver's onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent + // additional messages from being processed by the incoming StreamObserver. The onReadyHandler must return + // in a timely manner or else message processing throughput will suffer. + requestStream.setOnReadyHandler(new Runnable() { + // An iterator is used so we can pause and resume iteration of the request data. + Iterator iterator = names().iterator(); + + @Override + public void run() { + // Start generating values from where we left off on a non-gRPC thread. + while (requestStream.isReady()) { + if (iterator.hasNext()) { + // Send more messages if there are more messages to send. + String name = iterator.next(); + logger.info("--> " + name); + HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + requestStream.onNext(request); + } else { + // Signal completion if there is nothing left to send. + requestStream.onCompleted(); + } + } } - } + }); + } + + @Override + public void onNext(HelloReply value) { + logger.info("<-- " + value.getMessage()); + // Signal the sender to send one message. + requestStream.request(1); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + done.countDown(); + } + + @Override + public void onCompleted() { + logger.info("All Done"); + done.countDown(); } - }); - } - - @Override - public void onNext(HelloReply value) { - logger.info("<-- " + value.getMessage()); - // Signal the sender to send one message. - requestStream.request(1); - } - - @Override - public void onError(Throwable t) { - t.printStackTrace(); - done.countDown(); - } - - @Override - public void onCompleted() { - logger.info("All Done"); - done.countDown(); - } - - void request(int numMessages) { - requestStream.request(numMessages); - } - } - FlowControlResponseObserver responseObserver = new FlowControlResponseObserver(); + }; // Note: clientResponseObserver is handling both request and response stream processing. - stub.sayHelloStreaming(responseObserver); - responseObserver.request(1); + stub.sayHelloStreaming(clientResponseObserver); done.await(); diff --git a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java index de8142596ea..b8fbff65a30 100644 --- a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java +++ b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java @@ -39,7 +39,7 @@ public StreamObserver sayHelloStreaming(final StreamObserver serverCallStreamObserver = (ServerCallStreamObserver) responseObserver; - serverCallStreamObserver.disableAutoRequest(); + serverCallStreamObserver.disableAutoRequestWithInitial(0); // Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked // when the consuming side has enough buffer space to receive more messages. diff --git a/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java b/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java index 3ec30a1df34..9be04d753f8 100644 --- a/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java +++ b/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java @@ -135,8 +135,7 @@ public StreamObserver serverReflectionInfo( ProtoReflectionStreamObserver requestObserver = new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver); serverCallStreamObserver.setOnReadyHandler(requestObserver); - serverCallStreamObserver.disableAutoRequest(); - serverCallStreamObserver.request(1); + serverCallStreamObserver.disableAutoRequestWithInitial(1); return requestObserver; } diff --git a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java index c8b1e0b7ad1..8cd1041d36d 100644 --- a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java +++ b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java @@ -595,7 +595,7 @@ private static class FlowControlClientResponseObserver @Override public void beforeStart(final ClientCallStreamObserver requestStream) { - requestStream.disableAutoRequest(); + requestStream.disableAutoRequestWithInitial(0); } @Override diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java index e8a52db00c7..1e9c68eb8da 100644 --- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java @@ -107,30 +107,11 @@ public abstract class CallStreamObserver implements StreamObserver { * *

* - *

On the server side, migrating to {@link #disableAutoRequest} requires no changes; they have - * identical behavior. On the client-side, migrating to {@code disableAutoRequest} requires - * making adding an initial {@link #request} after the call has started. For example: - * - *

{@code
-   * Rectangle request = ...;
-   * class ResponseObserver implements ClientResponseObserver {
-   *   private ClientCallStreamObserver requestObserver;
-   *   ...
-   *   @Override public void beforeStart(ClientCallStreamObserver requestObserver) {
-   *     this.requestObserver = requestObserver;
-   *     requestObserver.disableAutoRequest();
-   *     // Note that requestObserver.request can not be called before the call is started
-   *     ...
-   *   }
-   *   // Need to expose the request method outside the Observer
-   *   void request(int numMessages) {
-   *     requestObserver.request(numMessages);
-   *   }
-   * }
-   * ResponseObserver responseObserver = new ResponseObserver();
-   * asyncStub.listFeatures(request, responseObserver);
-   * responseObserver.request(1);
-   * }
+ *

To migrate to {@link #disableAutoRequestWithInitial} on the server side, call + * {@code disableAutoRequestWithInitial(0)} as {@code disableAutoInboundFlowControl} + * already disables all inbound requests. On the client side, {@code + * disableAutoRequestWithInitial(1)} should be called to maintain existing behavior as + * {@code disableAutoInboundFlowControl} does not disable the initial request. * * @deprecated Use {@link #disableAutoRequest} instead (see note above about migrating). This * method will be removed. @@ -139,10 +120,9 @@ public abstract class CallStreamObserver implements StreamObserver { public abstract void disableAutoInboundFlowControl(); /** - * Disables automatic flow control where initial messages are requested when the call is started, - * and an additional message is requested to be read after a call to the 'inbound' {@link - * io.grpc.stub.StreamObserver#onNext(Object)} has completed. If disabled an application must - * make explicit calls to {@link #request} to receive any messages. + * Disables automatic flow control where an additional message is requested to be read after a + * call to the 'inbound' {@link io.grpc.stub.StreamObserver#onNext(Object)} has completed. A + * number of initial requests to make when the call is started may be specified. * *

On client-side this method may only be called during {@link * ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial @@ -157,7 +137,7 @@ public abstract class CallStreamObserver implements StreamObserver { * *

*/ - public abstract void disableAutoRequest(); + public abstract void disableAutoRequestWithInitial(int request); /** * Requests the peer to produce {@code count} more messages to be delivered to the 'inbound' diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 49a787a3363..ab96c61e240 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -274,8 +274,7 @@ private static void asyncUnaryRequestCall( req, new StreamObserverToCallListenerAdapter<>( responseObserver, - new CallToStreamObserverAdapter<>(call), - streamingResponse)); + new CallToStreamObserverAdapter<>(call, streamingResponse)); } private static void asyncUnaryRequestCall( @@ -297,11 +296,11 @@ private static StreamObserver asyncStreamingRequestCall( ClientCall call, StreamObserver responseObserver, boolean streamingResponse) { - CallToStreamObserverAdapter adapter = new CallToStreamObserverAdapter<>(call); + CallToStreamObserverAdapter adapter = new CallToStreamObserverAdapter<>( + call, streamingResponse); startCall( call, - new StreamObserverToCallListenerAdapter<>( - responseObserver, adapter, streamingResponse)); + new StreamObserverToCallListenerAdapter<>(responseObserver, adapter)); return adapter; } @@ -316,23 +315,20 @@ private abstract static class StartableListener extends ClientCall.Listener extends ClientCallStreamObserver { private boolean frozen; private final ClientCall call; + private final boolean streamingResponse; private Runnable onReadyHandler; - private AutoRequestMode autoRequestMode = AutoRequestMode.INITIAL_AND_NEXT; + private int initalRequest = 1; + private boolean autoRequestEnabled = true; private boolean aborted = false; private boolean completed = false; // Non private to avoid synthetic class CallToStreamObserverAdapter(ClientCall call) { this.call = call; + this.streamingResponse = streamingResponse; } private void freeze() { @@ -375,25 +371,29 @@ public void setOnReadyHandler(Runnable onReadyHandler) { @Deprecated @Override public void disableAutoInboundFlowControl() { - if (frozen) { - throw new IllegalStateException( - "Cannot disable auto flow control after call started. Use ClientResponseObserver"); - } - autoRequestMode = AutoRequestMode.INITIAL_ONLY; + disableAutoRequestWithInitial(1); } @Override - public void disableAutoRequest() { + public void disableAutoRequestWithInitial(int request) { if (frozen) { throw new IllegalStateException( "Cannot disable auto flow control after call started. Use ClientResponseObserver"); } - autoRequestMode = AutoRequestMode.DISABLED; + Preconditions.checkArugment(request >= 0, "Initial requests must be non-negative"); + initialRequest = request; + autoRequestEnabled = false; } @Override public void request(int count) { - call.request(count); + if (!streamingResponse && count == 1) { + // Initially ask for two responses from flow-control so that if a misbehaving server + // sends more than one responses, we can catch it and fail it in the listener. + call.request(2); + } else { + call.request(count); + } } @Override @@ -411,16 +411,13 @@ private static final class StreamObserverToCallListenerAdapter extends StartableListener { private final StreamObserver observer; private final CallToStreamObserverAdapter adapter; - private final boolean streamingResponse; private boolean firstResponseReceived; // Non private to avoid synthetic class StreamObserverToCallListenerAdapter( StreamObserver observer, - CallToStreamObserverAdapter adapter, - boolean streamingResponse) { + CallToStreamObserverAdapter adapter) { this.observer = observer; - this.streamingResponse = streamingResponse; this.adapter = adapter; if (observer instanceof ClientResponseObserver) { @SuppressWarnings("unchecked") @@ -437,7 +434,7 @@ public void onHeaders(Metadata headers) { @Override public void onMessage(RespT message) { - if (firstResponseReceived && !streamingResponse) { + if (firstResponseReceived && !adapter.streamingResponse) { throw Status.INTERNAL .withDescription("More than one responses received for unary or client-streaming call") .asRuntimeException(); @@ -445,7 +442,7 @@ public void onMessage(RespT message) { firstResponseReceived = true; observer.onNext(message); - if (streamingResponse && adapter.autoRequestMode == AutoRequestMode.INITIAL_AND_NEXT) { + if (adapter.streamingResponse && adapter.autoRequestEnabled) { // Request delivery of the next inbound message. adapter.request(1); } @@ -469,12 +466,8 @@ public void onReady() { @Override void onStart() { - if (adapter.autoRequestMode != AutoRequestMode.DISABLED) { - if (streamingResponse) { - adapter.request(1); - } else { - adapter.request(2); - } + if (adapter.initialRequest > 0) { + adapter.request(adapter.initialRequest); } } } @@ -521,7 +514,7 @@ public void onClose(Status status, Metadata trailers) { @Override void onStart() { - responseFuture.request(2); + responseFuture.call.request(2); } } @@ -552,10 +545,6 @@ protected boolean setException(Throwable throwable) { protected String pendingToString() { return MoreObjects.toStringHelper(this).add("clientCall", call).toString(); } - - void request(int numMessages) { - call.request(numMessages); - } } /** diff --git a/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java b/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java index e5557c51fbf..5f5510f6d3c 100644 --- a/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java +++ b/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java @@ -30,7 +30,8 @@ public interface ClientResponseObserver extends StreamObserverOnly the methods {@link ClientCallStreamObserver#setOnReadyHandler(Runnable)} and - * {@link ClientCallStreamObserver#disableAutoRequest()} may be called within this callback + * {@link ClientCallStreamObserver#disableAutoRequestWithInitial(int)} may be called within + * this callback * *
    *   // Copy an iterator to the request stream under flow-control
diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java
index 8e5ed1e1068..0d40d325671 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java
@@ -223,8 +223,8 @@ public ServerCall.Listener startCall(ServerCall call, Metadat
           new ServerCallStreamObserverImpl<>(call);
       StreamObserver requestObserver = method.invoke(responseObserver);
       responseObserver.freeze();
-      if (responseObserver.autoRequestEnabled) {
-        call.request(1);
+      if (responseObserver.initialRequest > 0) {
+        call.request(responseObserver.initialRequest);
       }
       return new StreamingServerCallListener(requestObserver, responseObserver, call);
     }
@@ -308,6 +308,7 @@ private static final class ServerCallStreamObserverImpl
     final ServerCall call;
     volatile boolean cancelled;
     private boolean frozen;
+    private int initialRequest;
     private boolean autoRequestEnabled = true;
     private boolean sentHeaders;
     private Runnable onReadyHandler;
@@ -402,12 +403,14 @@ public void setOnCancelHandler(Runnable onCancelHandler) {
     @Deprecated
     @Override
     public void disableAutoInboundFlowControl() {
-      disableAutoRequest();
+      disableAutoRequestWithInitial(0);
     }
 
     @Override
-    public void disableAutoRequest() {
+    public void disableAutoRequestWithInitial(int request) {
       checkState(!frozen, "Cannot disable auto flow control after initialization");
+      checkArugment(request >= 0, "Initial requests must be non-negative");
+      initialRequest = request;
       autoRequestEnabled = false;
     }
 
diff --git a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
index 5d9f363e0f0..e1f76294c97 100644
--- a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
@@ -375,7 +375,7 @@ public void request(int numMessages) {
     ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver() {
       @Override
       public void beforeStart(ClientCallStreamObserver requestStream) {
-        requestStream.disableAutoRequest();
+        requestStream.disableAutoRequestWithInitial(0);
       }
 
       @Override
@@ -404,7 +404,7 @@ public void callStreamObserverPropagatesFlowControlRequestsToCall()
         new ClientResponseObserver() {
           @Override
           public void beforeStart(ClientCallStreamObserver requestStream) {
-            requestStream.disableAutoRequest();
+            requestStream.disableAutoRequestWithInitial(0);
           }
 
           @Override
@@ -452,7 +452,7 @@ class ResponseObserver implements ClientResponseObserver {
       @Override
       public void beforeStart(ClientCallStreamObserver requestStream) {
         this.requestStream = requestStream;
-        requestStream.disableAutoRequest();
+        requestStream.disableAutoRequestWithInitial(0);
       }
 
       @Override
@@ -538,7 +538,7 @@ public void onCompleted() {
         new ClientResponseObserver() {
           @Override
           public void beforeStart(final ClientCallStreamObserver requestStream) {
-            requestStream.disableAutoRequest();
+            requestStream.disableAutoRequestWithInitial(0);
           }
 
           @Override
@@ -586,7 +586,7 @@ public void inprocessTransportOutboundFlowControl() throws Exception {
               public StreamObserver invoke(StreamObserver responseObserver) {
                 final ServerCallStreamObserver serverCallObserver =
                     (ServerCallStreamObserver) responseObserver;
-                serverCallObserver.disableAutoRequest();
+                serverCallObserver.disableAutoRequestWithInitial(0);
                 observerFuture.set(serverCallObserver);
                 return new StreamObserver() {
                   @Override
diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
index 93793ddfdd9..d67b9271c26 100644
--- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
@@ -276,7 +276,7 @@ public StreamObserver invoke(StreamObserver responseObserver)
         callHandler.startCall(serverCall, new Metadata());
     callListener.onMessage(1);
     try {
-      callObserver.get().disableAutoRequest();
+      callObserver.get().disableAutoRequestWithInitial(0);
       fail("Cannot set onCancel handler after service invocation");
     } catch (IllegalStateException expected) {
       // Expected
@@ -316,7 +316,7 @@ public void disablingInboundAutoRequestSuppressesRequestsForMoreMessages() throw
               public StreamObserver invoke(StreamObserver responseObserver) {
                 ServerCallStreamObserver serverCallObserver =
                     (ServerCallStreamObserver) responseObserver;
-                serverCallObserver.disableAutoRequest();
+                serverCallObserver.disableAutoRequestWithInitial(0);
                 return new ServerCalls.NoopStreamObserver<>();
               }
             });
@@ -339,7 +339,7 @@ public void disablingInboundAutoRequestForUnaryHasNoEffect() throws Exception {
               public void invoke(Integer req, StreamObserver responseObserver) {
                 ServerCallStreamObserver serverCallObserver =
                     (ServerCallStreamObserver) responseObserver;
-                serverCallObserver.disableAutoRequest();
+                serverCallObserver.disableAutoRequestWithInitial(0);
               }
             });
     callHandler.startCall(serverCall, new Metadata());

From 14ce757ac1e28341b352f9ecd643edb9c0720160 Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Fri, 3 Apr 2020 18:01:09 -0700
Subject: [PATCH 18/24] Update ClientCalls.java

Fix accidental deletion
---
 stub/src/main/java/io/grpc/stub/ClientCalls.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java
index ab96c61e240..ddf5e29c65e 100644
--- a/stub/src/main/java/io/grpc/stub/ClientCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java
@@ -274,7 +274,7 @@ private static  void asyncUnaryRequestCall(
         req,
         new StreamObserverToCallListenerAdapter<>(
             responseObserver,
-            new CallToStreamObserverAdapter<>(call, streamingResponse));
+            new CallToStreamObserverAdapter<>(call, streamingResponse)));
   }
 
   private static  void asyncUnaryRequestCall(

From b83f94f3d4bf739e680979bbc5d7c3c68afb3fcd Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Fri, 3 Apr 2020 18:06:48 -0700
Subject: [PATCH 19/24] Update ClientCalls.java

---
 stub/src/main/java/io/grpc/stub/ClientCalls.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java
index ddf5e29c65e..f992c73d2ad 100644
--- a/stub/src/main/java/io/grpc/stub/ClientCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java
@@ -326,7 +326,7 @@ private static final class CallToStreamObserverAdapter extends ClientCallStre
     private boolean completed = false;
 
     // Non private to avoid synthetic class
-    CallToStreamObserverAdapter(ClientCall call) {
+    CallToStreamObserverAdapter(ClientCall call, boolean streamingResponse) {
       this.call = call;
       this.streamingResponse = streamingResponse;
     }

From 1839faf7641833f83ea168b1be8a8edb96f29acb Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Fri, 3 Apr 2020 18:21:50 -0700
Subject: [PATCH 20/24] Fix more typos

---
 stub/src/main/java/io/grpc/stub/ClientCalls.java | 4 ++--
 stub/src/main/java/io/grpc/stub/ServerCalls.java | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java
index f992c73d2ad..dcdce5471a1 100644
--- a/stub/src/main/java/io/grpc/stub/ClientCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java
@@ -320,7 +320,7 @@ private static final class CallToStreamObserverAdapter extends ClientCallStre
     private final ClientCall call;
     private final boolean streamingResponse;
     private Runnable onReadyHandler;
-    private int initalRequest = 1;
+    private int initialRequest = 1;
     private boolean autoRequestEnabled = true;
     private boolean aborted = false;
     private boolean completed = false;
@@ -380,7 +380,7 @@ public void disableAutoRequestWithInitial(int request) {
         throw new IllegalStateException(
             "Cannot disable auto flow control after call started. Use ClientResponseObserver");
       }
-      Preconditions.checkArugment(request >= 0, "Initial requests must be non-negative");
+      Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative");
       initialRequest = request;
       autoRequestEnabled = false;
     }
diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java
index 0d40d325671..0379523ea2e 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java
@@ -409,7 +409,7 @@ public void disableAutoInboundFlowControl() {
     @Override
     public void disableAutoRequestWithInitial(int request) {
       checkState(!frozen, "Cannot disable auto flow control after initialization");
-      checkArugment(request >= 0, "Initial requests must be non-negative");
+      checkArgument(request >= 0, "Initial requests must be non-negative");
       initialRequest = request;
       autoRequestEnabled = false;
     }

From 14d2276a5ae9441df2d364cc6e64cb5115b7b8ff Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Fri, 3 Apr 2020 18:25:06 -0700
Subject: [PATCH 21/24] Update ServerCalls.java

---
 stub/src/main/java/io/grpc/stub/ServerCalls.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java
index 0379523ea2e..cf1b92ed96f 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java
@@ -409,7 +409,7 @@ public void disableAutoInboundFlowControl() {
     @Override
     public void disableAutoRequestWithInitial(int request) {
       checkState(!frozen, "Cannot disable auto flow control after initialization");
-      checkArgument(request >= 0, "Initial requests must be non-negative");
+      Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative");
       initialRequest = request;
       autoRequestEnabled = false;
     }

From da1d10973257108f840fd82857fcb7361af7023c Mon Sep 17 00:00:00 2001
From: Eric Anderson 
Date: Mon, 4 May 2020 10:19:08 -0700
Subject: [PATCH 22/24] fix bug causing hanging servers

---
 stub/src/main/java/io/grpc/stub/ServerCalls.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java
index cf1b92ed96f..13dba8ea1d5 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java
@@ -308,7 +308,7 @@ private static final class ServerCallStreamObserverImpl
     final ServerCall call;
     volatile boolean cancelled;
     private boolean frozen;
-    private int initialRequest;
+    private int initialRequest = 1;
     private boolean autoRequestEnabled = true;
     private boolean sentHeaders;
     private Runnable onReadyHandler;

From 371bb887651d3d521511ded6cbe50f619f9155e6 Mon Sep 17 00:00:00 2001
From: DRayX <7531689+DRayX@users.noreply.github.com>
Date: Tue, 5 May 2020 10:31:50 -0700
Subject: [PATCH 23/24] Remove `@Deprecated` from disableAutoInboundFlowControl

---
 stub/src/main/java/io/grpc/stub/CallStreamObserver.java | 6 ++----
 stub/src/test/java/io/grpc/stub/ClientCallsTest.java    | 1 -
 stub/src/test/java/io/grpc/stub/ServerCallsTest.java    | 1 -
 3 files changed, 2 insertions(+), 6 deletions(-)

diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
index 1e9c68eb8da..479ddfc81ca 100644
--- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java
@@ -112,11 +112,7 @@ public abstract class CallStreamObserver implements StreamObserver {
    * already disables all inbound requests.  On the client side, {@code
    * disableAutoRequestWithInitial(1)} should be called to maintain existing behavior as
    * {@code disableAutoInboundFlowControl} does not disable the initial request.
-   * 
-   * @deprecated Use {@link #disableAutoRequest} instead (see note above about migrating). This
-   *     method will be removed.
    */
-  @Deprecated
   public abstract void disableAutoInboundFlowControl();
 
   /**
@@ -136,6 +132,8 @@ public abstract class CallStreamObserver implements StreamObserver {
    *   
  • {@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.
  • * *

    + +

    This API is still a work in-progress and will likely change in the future. */ public abstract void disableAutoRequestWithInitial(int request); diff --git a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java index e1f76294c97..b9c8ca0115f 100644 --- a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java @@ -331,7 +331,6 @@ public void request(int numMessages) { }; ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver() { @Override - @SuppressWarnings("deprecation") public void beforeStart(ClientCallStreamObserver requestStream) { requestStream.disableAutoInboundFlowControl(); } diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index d67b9271c26..775c86717d0 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -289,7 +289,6 @@ public void disablingInboundAutoFlowControlSuppressesRequestsForMoreMessages() t ServerCalls.asyncBidiStreamingCall( new ServerCalls.BidiStreamingMethod() { @Override - @SuppressWarnings("deprecation") public StreamObserver invoke(StreamObserver responseObserver) { ServerCallStreamObserver serverCallObserver = (ServerCallStreamObserver) responseObserver; From 23d5b17c5964f2f25f2f286ab8ca97b11fbdd7b6 Mon Sep 17 00:00:00 2001 From: DRayX <7531689+DRayX@users.noreply.github.com> Date: Tue, 5 May 2020 11:07:27 -0700 Subject: [PATCH 24/24] Fix style --- stub/src/main/java/io/grpc/stub/CallStreamObserver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java index 479ddfc81ca..d24f835aec4 100644 --- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java @@ -132,8 +132,8 @@ public abstract class CallStreamObserver implements StreamObserver { *

  • {@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.
  • * *

    - -

    This API is still a work in-progress and will likely change in the future. + * + *

    This API is still a work in-progress and will likely change in the future. */ public abstract void disableAutoRequestWithInitial(int request);