From b68d40329c45f646ea729d0d79cf1a0bdde80ca7 Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Thu, 26 Aug 2021 17:35:50 +0700 Subject: [PATCH 01/11] stub: add ServerCallStreamObserver.setOnSuccessHandler(...) (#5895) This allows for user code to be notified when the messages are actually put on the wire and the stream is closed. --- .../grpc/stub/ServerCallStreamObserver.java | 17 ++++++ .../main/java/io/grpc/stub/ServerCalls.java | 23 ++++++++ .../java/io/grpc/stub/ServerCallsTest.java | 56 +++++++++++++++++++ 3 files changed, 96 insertions(+) diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index 3ba1bf563ef..8b19335f526 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -143,4 +143,21 @@ public void disableAutoRequest() { */ @Override public abstract void setMessageCompression(boolean enable); + + /** + * Sets a {@link Runnable} to be called when the call is successfully completed from the server's + * point of view: all the messages have been put on the wire and the stream has been closed. + * Note however that the client still may have not received all the messages due to race + * conditions or crashes. + * + *

It is guaranteed that execution of the {@link Runnable} is serialized with calls to the + * 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other + * callbacks are running.

+ * + *

This method may only be called during the initial call to the application, before the + * service returns its {@code StreamObserver}.

+ * + * @param onSuccessHandler to call when the call has been successfully completed + */ + public abstract void setOnSuccessHandler(Runnable onSuccessHandler); } diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index ba08139b716..1d96d06078f 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -206,6 +206,13 @@ public void onReady() { responseObserver.onReadyHandler.run(); } } + + @Override + public void onComplete() { + if (responseObserver.onSuccessHandler != null) { + responseObserver.onSuccessHandler.run(); + } + } } } @@ -291,6 +298,13 @@ public void onReady() { responseObserver.onReadyHandler.run(); } } + + @Override + public void onComplete() { + if (responseObserver.onSuccessHandler != null) { + responseObserver.onSuccessHandler.run(); + } + } } } @@ -320,6 +334,7 @@ private static final class ServerCallStreamObserverImpl private Runnable onCancelHandler; private boolean aborted = false; private boolean completed = false; + private Runnable onSuccessHandler; // Non private to avoid synthetic class ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) { @@ -423,6 +438,14 @@ public void disableAutoRequest() { public void request(int count) { call.request(count); } + + @Override + public void setOnSuccessHandler(Runnable onSuccessHandler) { + checkState(!frozen, "Cannot alter onSuccessHandler after initialization. May only be called " + + "during the initial call to the application, before the service returns its " + + "StreamObserver"); + this.onSuccessHandler = onSuccessHandler; + } } /** diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index a2a1ef93961..8ff7fbc43ab 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -199,6 +199,34 @@ public StreamObserver invoke(StreamObserver responseObserver) callObserver.get().onCompleted(); } + @Test + public void onSuccessHandlerCalledIfSet() throws Exception { + final AtomicBoolean onSuccess = new AtomicBoolean(); + final AtomicReference> callObserver = + new AtomicReference<>(); + ServerCallHandler callHandler = + ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + callObserver.set(serverCallObserver); + serverCallObserver.setOnSuccessHandler(new Runnable() { + @Override + public void run() { + onSuccess.set(true); + } + }); + return new ServerCalls.NoopStreamObserver<>(); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(serverCall, new Metadata()); + callListener.onComplete(); + assertTrue(onSuccess.get()); + } + @Test public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception { final AtomicReference> callObserver = @@ -255,6 +283,34 @@ public void run() { } } + @Test + public void cannotSetOnSuccessHandlerAfterServiceInvocation() throws Exception { + final AtomicReference> callObserver = + new AtomicReference<>(); + ServerCallHandler callHandler = + ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + callObserver.set((ServerCallStreamObserver) responseObserver); + return new ServerCalls.NoopStreamObserver<>(); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(serverCall, new Metadata()); + callListener.onMessage(1); + try { + callObserver.get().setOnSuccessHandler(new Runnable() { + @Override + public void run() { + } + }); + fail("Cannot set onReady after service invocation"); + } catch (IllegalStateException expected) { + // Expected + } + } + @Test public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception { final AtomicReference> callObserver = From ec8c489d2f15436a75471173d78fb5782d32a9be Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Wed, 1 Sep 2021 22:17:08 +0700 Subject: [PATCH 02/11] stub: rename onSuccessHandler to onFinishHandler The handler is called also after onError(...), so onSuccess sounds confusing in this situation. --- .../io/grpc/stub/ServerCallStreamObserver.java | 15 +++++++++++---- stub/src/main/java/io/grpc/stub/ServerCalls.java | 16 ++++++++-------- .../test/java/io/grpc/stub/ServerCallsTest.java | 14 +++++++------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index 8b19335f526..1815725d536 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -145,11 +145,18 @@ public void disableAutoRequest() { public abstract void setMessageCompression(boolean enable); /** - * Sets a {@link Runnable} to be called when the call is successfully completed from the server's - * point of view: all the messages have been put on the wire and the stream has been closed. + * Sets a {@link Runnable} to be executed when the call is correctly finished from the server's + * point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called, + * all the messages and trailing metadata have been put on the wire and the stream has been + * closed. * Note however that the client still may have not received all the messages due to race * conditions or crashes. * + *

+ * Unless server exits abruptly, if both {@code onFinishHandler} and {@code onCancelHandler} are + * set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)} exactly + * 1 of {@code onFinishHandler} and {@code onCancelHandler} will be called eventually.

+ * *

It is guaranteed that execution of the {@link Runnable} is serialized with calls to the * 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other * callbacks are running.

@@ -157,7 +164,7 @@ public void disableAutoRequest() { *

This method may only be called during the initial call to the application, before the * service returns its {@code StreamObserver}.

* - * @param onSuccessHandler to call when the call has been successfully completed + * @param onFinishHandler to call when the call has been correctly finished. */ - public abstract void setOnSuccessHandler(Runnable onSuccessHandler); + public abstract void setOnFinishHandler(Runnable onFinishHandler); } diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 1d96d06078f..5fb8a8d7f2c 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -209,8 +209,8 @@ public void onReady() { @Override public void onComplete() { - if (responseObserver.onSuccessHandler != null) { - responseObserver.onSuccessHandler.run(); + if (responseObserver.onFinishHandler != null) { + responseObserver.onFinishHandler.run(); } } } @@ -301,8 +301,8 @@ public void onReady() { @Override public void onComplete() { - if (responseObserver.onSuccessHandler != null) { - responseObserver.onSuccessHandler.run(); + if (responseObserver.onFinishHandler != null) { + responseObserver.onFinishHandler.run(); } } } @@ -334,7 +334,7 @@ private static final class ServerCallStreamObserverImpl private Runnable onCancelHandler; private boolean aborted = false; private boolean completed = false; - private Runnable onSuccessHandler; + private Runnable onFinishHandler; // Non private to avoid synthetic class ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) { @@ -440,11 +440,11 @@ public void request(int count) { } @Override - public void setOnSuccessHandler(Runnable onSuccessHandler) { - checkState(!frozen, "Cannot alter onSuccessHandler after initialization. May only be called " + public void setOnFinishHandler(Runnable onFinishHandler) { + checkState(!frozen, "Cannot alter onFinishHandler after initialization. May only be called " + "during the initial call to the application, before the service returns its " + "StreamObserver"); - this.onSuccessHandler = onSuccessHandler; + this.onFinishHandler = onFinishHandler; } } diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index 8ff7fbc43ab..82bb31aa0d3 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -200,8 +200,8 @@ public StreamObserver invoke(StreamObserver responseObserver) } @Test - public void onSuccessHandlerCalledIfSet() throws Exception { - final AtomicBoolean onSuccess = new AtomicBoolean(); + public void onFinishHandlerCalledIfSet() throws Exception { + final AtomicBoolean onFinish = new AtomicBoolean(); final AtomicReference> callObserver = new AtomicReference<>(); ServerCallHandler callHandler = @@ -212,10 +212,10 @@ public StreamObserver invoke(StreamObserver responseObserver) ServerCallStreamObserver serverCallObserver = (ServerCallStreamObserver) responseObserver; callObserver.set(serverCallObserver); - serverCallObserver.setOnSuccessHandler(new Runnable() { + serverCallObserver.setOnFinishHandler(new Runnable() { @Override public void run() { - onSuccess.set(true); + onFinish.set(true); } }); return new ServerCalls.NoopStreamObserver<>(); @@ -224,7 +224,7 @@ public void run() { ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata()); callListener.onComplete(); - assertTrue(onSuccess.get()); + assertTrue(onFinish.get()); } @Test @@ -284,7 +284,7 @@ public void run() { } @Test - public void cannotSetOnSuccessHandlerAfterServiceInvocation() throws Exception { + public void cannotSetOnFinishHandlerAfterServiceInvocation() throws Exception { final AtomicReference> callObserver = new AtomicReference<>(); ServerCallHandler callHandler = @@ -300,7 +300,7 @@ public StreamObserver invoke(StreamObserver responseObserver) callHandler.startCall(serverCall, new Metadata()); callListener.onMessage(1); try { - callObserver.get().setOnSuccessHandler(new Runnable() { + callObserver.get().setOnFinishHandler(new Runnable() { @Override public void run() { } From 84b4e233855def3d99dfa9386fdb048c0e296330 Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Wed, 1 Sep 2021 22:45:33 +0700 Subject: [PATCH 03/11] stub: add @ExperimentalApi to setOnFinishHandler() --- stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index 1815725d536..c94ec16490a 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -16,6 +16,8 @@ package io.grpc.stub; +import io.grpc.ExperimentalApi; + /** * A refinement of {@link CallStreamObserver} to allows for interaction with call * cancellation events on the server side. @@ -166,5 +168,6 @@ public void disableAutoRequest() { * * @param onFinishHandler to call when the call has been correctly finished. */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467") public abstract void setOnFinishHandler(Runnable onFinishHandler); } From aecf3832990b30e9ab3e441d302fe86a2e83b6c4 Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Thu, 2 Sep 2021 14:47:13 +0700 Subject: [PATCH 04/11] stub: add test for setOnFinishHandler() in unary client case --- .../java/io/grpc/stub/ServerCallsTest.java | 61 +++++++++++++------ 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index 82bb31aa0d3..7e1d597a8da 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -200,29 +200,52 @@ public StreamObserver invoke(StreamObserver responseObserver) } @Test - public void onFinishHandlerCalledIfSet() throws Exception { + public void onFinishHandlerCalledIfSetInStreamingClientCall() throws Exception { final AtomicBoolean onFinish = new AtomicBoolean(); - final AtomicReference> callObserver = - new AtomicReference<>(); - ServerCallHandler callHandler = - ServerCalls.asyncBidiStreamingCall( - new ServerCalls.BidiStreamingMethod() { + final AtomicReference> callObserver = new AtomicReference<>(); + ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + callObserver.set(serverCallObserver); + serverCallObserver.setOnFinishHandler(new Runnable() { @Override - public StreamObserver invoke(StreamObserver responseObserver) { - ServerCallStreamObserver serverCallObserver = - (ServerCallStreamObserver) responseObserver; - callObserver.set(serverCallObserver); - serverCallObserver.setOnFinishHandler(new Runnable() { - @Override - public void run() { - onFinish.set(true); - } - }); - return new ServerCalls.NoopStreamObserver<>(); + public void run() { + onFinish.set(true); } }); - ServerCall.Listener callListener = - callHandler.startCall(serverCall, new Metadata()); + return new ServerCalls.NoopStreamObserver<>(); + } + }); + ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata()); + callListener.onComplete(); + assertTrue(onFinish.get()); + } + + @Test + public void onFinishHandlerCalledIfSetInUnaryClientCall() throws Exception { + final AtomicBoolean onFinish = new AtomicBoolean(); + final AtomicReference> callObserver = new AtomicReference<>(); + ServerCallHandler callHandler = ServerCalls.asyncServerStreamingCall( + new ServerCalls.ServerStreamingMethod() { + @Override + public void invoke(Integer request, StreamObserver responseObserver) { + ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + callObserver.set(serverCallObserver); + serverCallObserver.setOnFinishHandler(new Runnable() { + @Override + public void run() { + onFinish.set(true); + } + }); + } + }); + ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata()); + callListener.onMessage(0); + callListener.onHalfClose(); callListener.onComplete(); assertTrue(onFinish.get()); } From 4408a91230d80ea65e0f23b35e61b38f9241aa2a Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Thu, 2 Sep 2021 15:05:24 +0700 Subject: [PATCH 05/11] stub: fix formatting in cannotSetOnFinishHandlerAfterServiceInvocation --- .../java/io/grpc/stub/ServerCallsTest.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index 7e1d597a8da..9bb738b237d 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -308,19 +308,16 @@ public void run() { @Test public void cannotSetOnFinishHandlerAfterServiceInvocation() throws Exception { - final AtomicReference> callObserver = - new AtomicReference<>(); - ServerCallHandler callHandler = - ServerCalls.asyncBidiStreamingCall( - new ServerCalls.BidiStreamingMethod() { - @Override - public StreamObserver invoke(StreamObserver responseObserver) { - callObserver.set((ServerCallStreamObserver) responseObserver); - return new ServerCalls.NoopStreamObserver<>(); - } - }); - ServerCall.Listener callListener = - callHandler.startCall(serverCall, new Metadata()); + final AtomicReference> callObserver = new AtomicReference<>(); + ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + callObserver.set((ServerCallStreamObserver) responseObserver); + return new ServerCalls.NoopStreamObserver<>(); + } + }); + ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata()); callListener.onMessage(1); try { callObserver.get().setOnFinishHandler(new Runnable() { From 2690d5c9e89e9a448cc6cb2ce1062d1453d1cb4c Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Thu, 2 Sep 2021 15:18:55 +0700 Subject: [PATCH 06/11] stub: remove unused var from setOnFinishHandler(...) tests --- stub/src/test/java/io/grpc/stub/ServerCallsTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index 9bb738b237d..39f17979e8b 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -202,14 +202,12 @@ public StreamObserver invoke(StreamObserver responseObserver) @Test public void onFinishHandlerCalledIfSetInStreamingClientCall() throws Exception { final AtomicBoolean onFinish = new AtomicBoolean(); - final AtomicReference> callObserver = new AtomicReference<>(); ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall( new ServerCalls.BidiStreamingMethod() { @Override public StreamObserver invoke(StreamObserver responseObserver) { ServerCallStreamObserver serverCallObserver = (ServerCallStreamObserver) responseObserver; - callObserver.set(serverCallObserver); serverCallObserver.setOnFinishHandler(new Runnable() { @Override public void run() { @@ -227,14 +225,12 @@ public void run() { @Test public void onFinishHandlerCalledIfSetInUnaryClientCall() throws Exception { final AtomicBoolean onFinish = new AtomicBoolean(); - final AtomicReference> callObserver = new AtomicReference<>(); ServerCallHandler callHandler = ServerCalls.asyncServerStreamingCall( new ServerCalls.ServerStreamingMethod() { @Override public void invoke(Integer request, StreamObserver responseObserver) { ServerCallStreamObserver serverCallObserver = (ServerCallStreamObserver) responseObserver; - callObserver.set(serverCallObserver); serverCallObserver.setOnFinishHandler(new Runnable() { @Override public void run() { From 3429e67fe229fe6a613eef9fea2b86042622d24b Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Thu, 2 Sep 2021 16:23:09 +0700 Subject: [PATCH 07/11] stub: fix javadoc of setOnFinishHandler(...) Fix some formatting and references. --- .../java/io/grpc/stub/ServerCallStreamObserver.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index c94ec16490a..7dae00961f7 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -154,14 +154,13 @@ public void disableAutoRequest() { * Note however that the client still may have not received all the messages due to race * conditions or crashes. * - *

- * Unless server exits abruptly, if both {@code onFinishHandler} and {@code onCancelHandler} are - * set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)} exactly - * 1 of {@code onFinishHandler} and {@code onCancelHandler} will be called eventually.

+ *

Unless server exits abruptly, if both {@code onFinishHandler} and {@code onCancelHandler} + * are set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)} + * exactly 1 of {@code onFinishHandler} and {@code onCancelHandler} will be called eventually.

* - *

It is guaranteed that execution of the {@link Runnable} is serialized with calls to the - * 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other - * callbacks are running.

+ *

It is guaranteed that execution of the {@code onFinishHandler} is serialized with calls to + * the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if + * other callbacks are running.

* *

This method may only be called during the initial call to the application, before the * service returns its {@code StreamObserver}.

From b733f209d60abb3bc13664958519592321114093 Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Fri, 3 Sep 2021 13:13:28 +0700 Subject: [PATCH 08/11] stub: rename onFinishHandler to onCloseHandler --- .../grpc/stub/ServerCallStreamObserver.java | 14 +++++------ .../main/java/io/grpc/stub/ServerCalls.java | 16 ++++++------- .../java/io/grpc/stub/ServerCallsTest.java | 24 +++++++++---------- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index 7dae00961f7..77ebef840d5 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -147,26 +147,26 @@ public void disableAutoRequest() { public abstract void setMessageCompression(boolean enable); /** - * Sets a {@link Runnable} to be executed when the call is correctly finished from the server's + * Sets a {@link Runnable} to be executed when the call is closed correctly from the server's * point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called, * all the messages and trailing metadata have been put on the wire and the stream has been * closed. * Note however that the client still may have not received all the messages due to race * conditions or crashes. * - *

Unless server exits abruptly, if both {@code onFinishHandler} and {@code onCancelHandler} + *

Unless server exits abruptly, if both {@code onCloseHandler} and {@code onCancelHandler} * are set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)} - * exactly 1 of {@code onFinishHandler} and {@code onCancelHandler} will be called eventually.

+ * exactly 1 of {@code onCloseHandler} and {@code onCancelHandler} will be called eventually.

* - *

It is guaranteed that execution of the {@code onFinishHandler} is serialized with calls to + *

It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to * the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if * other callbacks are running.

* *

This method may only be called during the initial call to the application, before the - * service returns its {@code StreamObserver}.

+ * service returns its {@link StreamObserver request observer}.

* - * @param onFinishHandler to call when the call has been correctly finished. + * @param onCloseHandler to execute when the call has been closed correctly. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467") - public abstract void setOnFinishHandler(Runnable onFinishHandler); + public abstract void setOnCloseHandler(Runnable onCloseHandler); } diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 5fb8a8d7f2c..89e738adab1 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -209,8 +209,8 @@ public void onReady() { @Override public void onComplete() { - if (responseObserver.onFinishHandler != null) { - responseObserver.onFinishHandler.run(); + if (responseObserver.onCloseHandler != null) { + responseObserver.onCloseHandler.run(); } } } @@ -301,8 +301,8 @@ public void onReady() { @Override public void onComplete() { - if (responseObserver.onFinishHandler != null) { - responseObserver.onFinishHandler.run(); + if (responseObserver.onCloseHandler != null) { + responseObserver.onCloseHandler.run(); } } } @@ -334,7 +334,7 @@ private static final class ServerCallStreamObserverImpl private Runnable onCancelHandler; private boolean aborted = false; private boolean completed = false; - private Runnable onFinishHandler; + private Runnable onCloseHandler; // Non private to avoid synthetic class ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) { @@ -440,11 +440,11 @@ public void request(int count) { } @Override - public void setOnFinishHandler(Runnable onFinishHandler) { - checkState(!frozen, "Cannot alter onFinishHandler after initialization. May only be called " + public void setOnCloseHandler(Runnable onCloseHandler) { + checkState(!frozen, "Cannot alter onCloseHandler after initialization. May only be called " + "during the initial call to the application, before the service returns its " + "StreamObserver"); - this.onFinishHandler = onFinishHandler; + this.onCloseHandler = onCloseHandler; } } diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index 39f17979e8b..7227d26c5b8 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -200,18 +200,18 @@ public StreamObserver invoke(StreamObserver responseObserver) } @Test - public void onFinishHandlerCalledIfSetInStreamingClientCall() throws Exception { - final AtomicBoolean onFinish = new AtomicBoolean(); + public void onCloseHandlerCalledIfSetInStreamingClientCall() throws Exception { + final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean(); ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall( new ServerCalls.BidiStreamingMethod() { @Override public StreamObserver invoke(StreamObserver responseObserver) { ServerCallStreamObserver serverCallObserver = (ServerCallStreamObserver) responseObserver; - serverCallObserver.setOnFinishHandler(new Runnable() { + serverCallObserver.setOnCloseHandler(new Runnable() { @Override public void run() { - onFinish.set(true); + onCloseHandlerCalled.set(true); } }); return new ServerCalls.NoopStreamObserver<>(); @@ -219,22 +219,22 @@ public void run() { }); ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata()); callListener.onComplete(); - assertTrue(onFinish.get()); + assertTrue(onCloseHandlerCalled.get()); } @Test - public void onFinishHandlerCalledIfSetInUnaryClientCall() throws Exception { - final AtomicBoolean onFinish = new AtomicBoolean(); + public void onCloseHandlerCalledIfSetInUnaryClientCall() throws Exception { + final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean(); ServerCallHandler callHandler = ServerCalls.asyncServerStreamingCall( new ServerCalls.ServerStreamingMethod() { @Override public void invoke(Integer request, StreamObserver responseObserver) { ServerCallStreamObserver serverCallObserver = (ServerCallStreamObserver) responseObserver; - serverCallObserver.setOnFinishHandler(new Runnable() { + serverCallObserver.setOnCloseHandler(new Runnable() { @Override public void run() { - onFinish.set(true); + onCloseHandlerCalled.set(true); } }); } @@ -243,7 +243,7 @@ public void run() { callListener.onMessage(0); callListener.onHalfClose(); callListener.onComplete(); - assertTrue(onFinish.get()); + assertTrue(onCloseHandlerCalled.get()); } @Test @@ -303,7 +303,7 @@ public void run() { } @Test - public void cannotSetOnFinishHandlerAfterServiceInvocation() throws Exception { + public void cannotSetOnCloseHandlerAfterServiceInvocation() throws Exception { final AtomicReference> callObserver = new AtomicReference<>(); ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall( new ServerCalls.BidiStreamingMethod() { @@ -316,7 +316,7 @@ public StreamObserver invoke(StreamObserver responseObserver) ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata()); callListener.onMessage(1); try { - callObserver.get().setOnFinishHandler(new Runnable() { + callObserver.get().setOnCloseHandler(new Runnable() { @Override public void run() { } From 9720b63eb9985710be3ed64f3938a5a289bd689d Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Sat, 4 Sep 2021 14:48:25 +0700 Subject: [PATCH 09/11] stub: apply review comments to setOnCloseHandler() javadoc --- .../main/java/io/grpc/stub/ServerCallStreamObserver.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index 77ebef840d5..ae5c65877e3 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -147,12 +147,11 @@ public void disableAutoRequest() { public abstract void setMessageCompression(boolean enable); /** - * Sets a {@link Runnable} to be executed when the call is closed correctly from the server's + * Sets a {@link Runnable} to be executed when the call is closed cleanly from the server's * point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called, * all the messages and trailing metadata have been put on the wire and the stream has been - * closed. - * Note however that the client still may have not received all the messages due to race - * conditions or crashes. + * closed. Note however that the client still may have not received all the messages due to + * network delay, client crashes, and cancellation races. * *

Unless server exits abruptly, if both {@code onCloseHandler} and {@code onCancelHandler} * are set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)} @@ -165,7 +164,7 @@ public void disableAutoRequest() { *

This method may only be called during the initial call to the application, before the * service returns its {@link StreamObserver request observer}.

* - * @param onCloseHandler to execute when the call has been closed correctly. + * @param onCloseHandler to execute when the call has been closed cleanly. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467") public abstract void setOnCloseHandler(Runnable onCloseHandler); From 597c8aee1884b79a1d56e6fa4c134c95bfdfc871 Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Sat, 4 Sep 2021 14:58:17 +0700 Subject: [PATCH 10/11] stub: apply review comments to setOnCloseHandler() javadoc, part 2 --- .../src/main/java/io/grpc/stub/ServerCallStreamObserver.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index ae5c65877e3..0178e62a35a 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -153,9 +153,8 @@ public void disableAutoRequest() { * closed. Note however that the client still may have not received all the messages due to * network delay, client crashes, and cancellation races. * - *

Unless server exits abruptly, if both {@code onCloseHandler} and {@code onCancelHandler} - * are set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)} - * exactly 1 of {@code onCloseHandler} and {@code onCancelHandler} will be called eventually.

+ *

Exactly one of {@code onCloseHandler} and {@code onCancelHandler} is guaranteed to be called + * when the RPC terminates.

* *

It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to * the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if From dd2bb58cb6586da984d250ecc019aa8ff2faf85c Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Sat, 4 Sep 2021 15:55:46 +0700 Subject: [PATCH 11/11] stub: apply review comments to setOnCloseHandler() javadoc, part 3 --- .../main/java/io/grpc/stub/ServerCallStreamObserver.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index 0178e62a35a..18255ddd6d4 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -149,9 +149,9 @@ public void disableAutoRequest() { /** * Sets a {@link Runnable} to be executed when the call is closed cleanly from the server's * point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called, - * all the messages and trailing metadata have been put on the wire and the stream has been - * closed. Note however that the client still may have not received all the messages due to - * network delay, client crashes, and cancellation races. + * all the messages and trailing metadata have been sent and the stream has been closed. Note + * however that the client still may have not received all the messages due to network delay, + * client crashes, and cancellation races. * *

Exactly one of {@code onCloseHandler} and {@code onCancelHandler} is guaranteed to be called * when the RPC terminates.