From b68d40329c45f646ea729d0d79cf1a0bdde80ca7 Mon Sep 17 00:00:00 2001
From: Piotr Morgwai Kotarbinski
Date: Thu, 26 Aug 2021 17:35:50 +0700
Subject: [PATCH 01/11] stub: add
ServerCallStreamObserver.setOnSuccessHandler(...) (#5895)
This allows for user code to be notified when the messages are actually
put on the wire and the stream is closed.
---
.../grpc/stub/ServerCallStreamObserver.java | 17 ++++++
.../main/java/io/grpc/stub/ServerCalls.java | 23 ++++++++
.../java/io/grpc/stub/ServerCallsTest.java | 56 +++++++++++++++++++
3 files changed, 96 insertions(+)
diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
index 3ba1bf563ef..8b19335f526 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
@@ -143,4 +143,21 @@ public void disableAutoRequest() {
*/
@Override
public abstract void setMessageCompression(boolean enable);
+
+ /**
+ * Sets a {@link Runnable} to be called when the call is successfully completed from the server's
+ * point of view: all the messages have been put on the wire and the stream has been closed.
+ * Note however that the client still may have not received all the messages due to race
+ * conditions or crashes.
+ *
+ * It is guaranteed that execution of the {@link Runnable} is serialized with calls to the
+ * 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other
+ * callbacks are running.
+ *
+ * This method may only be called during the initial call to the application, before the
+ * service returns its {@code StreamObserver}.
+ *
+ * @param onSuccessHandler to call when the call has been successfully completed
+ */
+ public abstract void setOnSuccessHandler(Runnable onSuccessHandler);
}
diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java
index ba08139b716..1d96d06078f 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java
@@ -206,6 +206,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}
+
+ @Override
+ public void onComplete() {
+ if (responseObserver.onSuccessHandler != null) {
+ responseObserver.onSuccessHandler.run();
+ }
+ }
}
}
@@ -291,6 +298,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}
+
+ @Override
+ public void onComplete() {
+ if (responseObserver.onSuccessHandler != null) {
+ responseObserver.onSuccessHandler.run();
+ }
+ }
}
}
@@ -320,6 +334,7 @@ private static final class ServerCallStreamObserverImpl
private Runnable onCancelHandler;
private boolean aborted = false;
private boolean completed = false;
+ private Runnable onSuccessHandler;
// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) {
@@ -423,6 +438,14 @@ public void disableAutoRequest() {
public void request(int count) {
call.request(count);
}
+
+ @Override
+ public void setOnSuccessHandler(Runnable onSuccessHandler) {
+ checkState(!frozen, "Cannot alter onSuccessHandler after initialization. May only be called "
+ + "during the initial call to the application, before the service returns its "
+ + "StreamObserver");
+ this.onSuccessHandler = onSuccessHandler;
+ }
}
/**
diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
index a2a1ef93961..8ff7fbc43ab 100644
--- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
@@ -199,6 +199,34 @@ public StreamObserver invoke(StreamObserver responseObserver)
callObserver.get().onCompleted();
}
+ @Test
+ public void onSuccessHandlerCalledIfSet() throws Exception {
+ final AtomicBoolean onSuccess = new AtomicBoolean();
+ final AtomicReference> callObserver =
+ new AtomicReference<>();
+ ServerCallHandler callHandler =
+ ServerCalls.asyncBidiStreamingCall(
+ new ServerCalls.BidiStreamingMethod() {
+ @Override
+ public StreamObserver invoke(StreamObserver responseObserver) {
+ ServerCallStreamObserver serverCallObserver =
+ (ServerCallStreamObserver) responseObserver;
+ callObserver.set(serverCallObserver);
+ serverCallObserver.setOnSuccessHandler(new Runnable() {
+ @Override
+ public void run() {
+ onSuccess.set(true);
+ }
+ });
+ return new ServerCalls.NoopStreamObserver<>();
+ }
+ });
+ ServerCall.Listener callListener =
+ callHandler.startCall(serverCall, new Metadata());
+ callListener.onComplete();
+ assertTrue(onSuccess.get());
+ }
+
@Test
public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception {
final AtomicReference> callObserver =
@@ -255,6 +283,34 @@ public void run() {
}
}
+ @Test
+ public void cannotSetOnSuccessHandlerAfterServiceInvocation() throws Exception {
+ final AtomicReference> callObserver =
+ new AtomicReference<>();
+ ServerCallHandler callHandler =
+ ServerCalls.asyncBidiStreamingCall(
+ new ServerCalls.BidiStreamingMethod() {
+ @Override
+ public StreamObserver invoke(StreamObserver responseObserver) {
+ callObserver.set((ServerCallStreamObserver) responseObserver);
+ return new ServerCalls.NoopStreamObserver<>();
+ }
+ });
+ ServerCall.Listener callListener =
+ callHandler.startCall(serverCall, new Metadata());
+ callListener.onMessage(1);
+ try {
+ callObserver.get().setOnSuccessHandler(new Runnable() {
+ @Override
+ public void run() {
+ }
+ });
+ fail("Cannot set onReady after service invocation");
+ } catch (IllegalStateException expected) {
+ // Expected
+ }
+ }
+
@Test
public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
final AtomicReference> callObserver =
From ec8c489d2f15436a75471173d78fb5782d32a9be Mon Sep 17 00:00:00 2001
From: Piotr Morgwai Kotarbinski
Date: Wed, 1 Sep 2021 22:17:08 +0700
Subject: [PATCH 02/11] stub: rename onSuccessHandler to onFinishHandler
The handler is called also after onError(...), so onSuccess sounds
confusing in this situation.
---
.../io/grpc/stub/ServerCallStreamObserver.java | 15 +++++++++++----
stub/src/main/java/io/grpc/stub/ServerCalls.java | 16 ++++++++--------
.../test/java/io/grpc/stub/ServerCallsTest.java | 14 +++++++-------
3 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
index 8b19335f526..1815725d536 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
@@ -145,11 +145,18 @@ public void disableAutoRequest() {
public abstract void setMessageCompression(boolean enable);
/**
- * Sets a {@link Runnable} to be called when the call is successfully completed from the server's
- * point of view: all the messages have been put on the wire and the stream has been closed.
+ * Sets a {@link Runnable} to be executed when the call is correctly finished from the server's
+ * point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called,
+ * all the messages and trailing metadata have been put on the wire and the stream has been
+ * closed.
* Note however that the client still may have not received all the messages due to race
* conditions or crashes.
*
+ *
+ * Unless server exits abruptly, if both {@code onFinishHandler} and {@code onCancelHandler} are
+ * set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)} exactly
+ * 1 of {@code onFinishHandler} and {@code onCancelHandler} will be called eventually.
+ *
* It is guaranteed that execution of the {@link Runnable} is serialized with calls to the
* 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other
* callbacks are running.
@@ -157,7 +164,7 @@ public void disableAutoRequest() {
* This method may only be called during the initial call to the application, before the
* service returns its {@code StreamObserver}.
*
- * @param onSuccessHandler to call when the call has been successfully completed
+ * @param onFinishHandler to call when the call has been correctly finished.
*/
- public abstract void setOnSuccessHandler(Runnable onSuccessHandler);
+ public abstract void setOnFinishHandler(Runnable onFinishHandler);
}
diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java
index 1d96d06078f..5fb8a8d7f2c 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java
@@ -209,8 +209,8 @@ public void onReady() {
@Override
public void onComplete() {
- if (responseObserver.onSuccessHandler != null) {
- responseObserver.onSuccessHandler.run();
+ if (responseObserver.onFinishHandler != null) {
+ responseObserver.onFinishHandler.run();
}
}
}
@@ -301,8 +301,8 @@ public void onReady() {
@Override
public void onComplete() {
- if (responseObserver.onSuccessHandler != null) {
- responseObserver.onSuccessHandler.run();
+ if (responseObserver.onFinishHandler != null) {
+ responseObserver.onFinishHandler.run();
}
}
}
@@ -334,7 +334,7 @@ private static final class ServerCallStreamObserverImpl
private Runnable onCancelHandler;
private boolean aborted = false;
private boolean completed = false;
- private Runnable onSuccessHandler;
+ private Runnable onFinishHandler;
// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) {
@@ -440,11 +440,11 @@ public void request(int count) {
}
@Override
- public void setOnSuccessHandler(Runnable onSuccessHandler) {
- checkState(!frozen, "Cannot alter onSuccessHandler after initialization. May only be called "
+ public void setOnFinishHandler(Runnable onFinishHandler) {
+ checkState(!frozen, "Cannot alter onFinishHandler after initialization. May only be called "
+ "during the initial call to the application, before the service returns its "
+ "StreamObserver");
- this.onSuccessHandler = onSuccessHandler;
+ this.onFinishHandler = onFinishHandler;
}
}
diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
index 8ff7fbc43ab..82bb31aa0d3 100644
--- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
@@ -200,8 +200,8 @@ public StreamObserver invoke(StreamObserver responseObserver)
}
@Test
- public void onSuccessHandlerCalledIfSet() throws Exception {
- final AtomicBoolean onSuccess = new AtomicBoolean();
+ public void onFinishHandlerCalledIfSet() throws Exception {
+ final AtomicBoolean onFinish = new AtomicBoolean();
final AtomicReference> callObserver =
new AtomicReference<>();
ServerCallHandler callHandler =
@@ -212,10 +212,10 @@ public StreamObserver invoke(StreamObserver responseObserver)
ServerCallStreamObserver serverCallObserver =
(ServerCallStreamObserver) responseObserver;
callObserver.set(serverCallObserver);
- serverCallObserver.setOnSuccessHandler(new Runnable() {
+ serverCallObserver.setOnFinishHandler(new Runnable() {
@Override
public void run() {
- onSuccess.set(true);
+ onFinish.set(true);
}
});
return new ServerCalls.NoopStreamObserver<>();
@@ -224,7 +224,7 @@ public void run() {
ServerCall.Listener callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onComplete();
- assertTrue(onSuccess.get());
+ assertTrue(onFinish.get());
}
@Test
@@ -284,7 +284,7 @@ public void run() {
}
@Test
- public void cannotSetOnSuccessHandlerAfterServiceInvocation() throws Exception {
+ public void cannotSetOnFinishHandlerAfterServiceInvocation() throws Exception {
final AtomicReference> callObserver =
new AtomicReference<>();
ServerCallHandler callHandler =
@@ -300,7 +300,7 @@ public StreamObserver invoke(StreamObserver responseObserver)
callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
- callObserver.get().setOnSuccessHandler(new Runnable() {
+ callObserver.get().setOnFinishHandler(new Runnable() {
@Override
public void run() {
}
From 84b4e233855def3d99dfa9386fdb048c0e296330 Mon Sep 17 00:00:00 2001
From: Piotr Morgwai Kotarbinski
Date: Wed, 1 Sep 2021 22:45:33 +0700
Subject: [PATCH 03/11] stub: add @ExperimentalApi to setOnFinishHandler()
---
stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java | 3 +++
1 file changed, 3 insertions(+)
diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
index 1815725d536..c94ec16490a 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
@@ -16,6 +16,8 @@
package io.grpc.stub;
+import io.grpc.ExperimentalApi;
+
/**
* A refinement of {@link CallStreamObserver} to allows for interaction with call
* cancellation events on the server side.
@@ -166,5 +168,6 @@ public void disableAutoRequest() {
*
* @param onFinishHandler to call when the call has been correctly finished.
*/
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467")
public abstract void setOnFinishHandler(Runnable onFinishHandler);
}
From aecf3832990b30e9ab3e441d302fe86a2e83b6c4 Mon Sep 17 00:00:00 2001
From: Piotr Morgwai Kotarbinski
Date: Thu, 2 Sep 2021 14:47:13 +0700
Subject: [PATCH 04/11] stub: add test for setOnFinishHandler() in unary client
case
---
.../java/io/grpc/stub/ServerCallsTest.java | 61 +++++++++++++------
1 file changed, 42 insertions(+), 19 deletions(-)
diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
index 82bb31aa0d3..7e1d597a8da 100644
--- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
@@ -200,29 +200,52 @@ public StreamObserver invoke(StreamObserver responseObserver)
}
@Test
- public void onFinishHandlerCalledIfSet() throws Exception {
+ public void onFinishHandlerCalledIfSetInStreamingClientCall() throws Exception {
final AtomicBoolean onFinish = new AtomicBoolean();
- final AtomicReference> callObserver =
- new AtomicReference<>();
- ServerCallHandler callHandler =
- ServerCalls.asyncBidiStreamingCall(
- new ServerCalls.BidiStreamingMethod() {
+ final AtomicReference> callObserver = new AtomicReference<>();
+ ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall(
+ new ServerCalls.BidiStreamingMethod() {
+ @Override
+ public StreamObserver invoke(StreamObserver responseObserver) {
+ ServerCallStreamObserver serverCallObserver =
+ (ServerCallStreamObserver) responseObserver;
+ callObserver.set(serverCallObserver);
+ serverCallObserver.setOnFinishHandler(new Runnable() {
@Override
- public StreamObserver invoke(StreamObserver responseObserver) {
- ServerCallStreamObserver serverCallObserver =
- (ServerCallStreamObserver) responseObserver;
- callObserver.set(serverCallObserver);
- serverCallObserver.setOnFinishHandler(new Runnable() {
- @Override
- public void run() {
- onFinish.set(true);
- }
- });
- return new ServerCalls.NoopStreamObserver<>();
+ public void run() {
+ onFinish.set(true);
}
});
- ServerCall.Listener callListener =
- callHandler.startCall(serverCall, new Metadata());
+ return new ServerCalls.NoopStreamObserver<>();
+ }
+ });
+ ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata());
+ callListener.onComplete();
+ assertTrue(onFinish.get());
+ }
+
+ @Test
+ public void onFinishHandlerCalledIfSetInUnaryClientCall() throws Exception {
+ final AtomicBoolean onFinish = new AtomicBoolean();
+ final AtomicReference> callObserver = new AtomicReference<>();
+ ServerCallHandler callHandler = ServerCalls.asyncServerStreamingCall(
+ new ServerCalls.ServerStreamingMethod() {
+ @Override
+ public void invoke(Integer request, StreamObserver responseObserver) {
+ ServerCallStreamObserver serverCallObserver =
+ (ServerCallStreamObserver) responseObserver;
+ callObserver.set(serverCallObserver);
+ serverCallObserver.setOnFinishHandler(new Runnable() {
+ @Override
+ public void run() {
+ onFinish.set(true);
+ }
+ });
+ }
+ });
+ ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata());
+ callListener.onMessage(0);
+ callListener.onHalfClose();
callListener.onComplete();
assertTrue(onFinish.get());
}
From 4408a91230d80ea65e0f23b35e61b38f9241aa2a Mon Sep 17 00:00:00 2001
From: Piotr Morgwai Kotarbinski
Date: Thu, 2 Sep 2021 15:05:24 +0700
Subject: [PATCH 05/11] stub: fix formatting in
cannotSetOnFinishHandlerAfterServiceInvocation
---
.../java/io/grpc/stub/ServerCallsTest.java | 23 ++++++++-----------
1 file changed, 10 insertions(+), 13 deletions(-)
diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
index 7e1d597a8da..9bb738b237d 100644
--- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
@@ -308,19 +308,16 @@ public void run() {
@Test
public void cannotSetOnFinishHandlerAfterServiceInvocation() throws Exception {
- final AtomicReference> callObserver =
- new AtomicReference<>();
- ServerCallHandler callHandler =
- ServerCalls.asyncBidiStreamingCall(
- new ServerCalls.BidiStreamingMethod() {
- @Override
- public StreamObserver invoke(StreamObserver responseObserver) {
- callObserver.set((ServerCallStreamObserver) responseObserver);
- return new ServerCalls.NoopStreamObserver<>();
- }
- });
- ServerCall.Listener callListener =
- callHandler.startCall(serverCall, new Metadata());
+ final AtomicReference> callObserver = new AtomicReference<>();
+ ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall(
+ new ServerCalls.BidiStreamingMethod() {
+ @Override
+ public StreamObserver invoke(StreamObserver responseObserver) {
+ callObserver.set((ServerCallStreamObserver) responseObserver);
+ return new ServerCalls.NoopStreamObserver<>();
+ }
+ });
+ ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().setOnFinishHandler(new Runnable() {
From 2690d5c9e89e9a448cc6cb2ce1062d1453d1cb4c Mon Sep 17 00:00:00 2001
From: Piotr Morgwai Kotarbinski
Date: Thu, 2 Sep 2021 15:18:55 +0700
Subject: [PATCH 06/11] stub: remove unused var from setOnFinishHandler(...)
tests
---
stub/src/test/java/io/grpc/stub/ServerCallsTest.java | 4 ----
1 file changed, 4 deletions(-)
diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
index 9bb738b237d..39f17979e8b 100644
--- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
@@ -202,14 +202,12 @@ public StreamObserver invoke(StreamObserver responseObserver)
@Test
public void onFinishHandlerCalledIfSetInStreamingClientCall() throws Exception {
final AtomicBoolean onFinish = new AtomicBoolean();
- final AtomicReference> callObserver = new AtomicReference<>();
ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod() {
@Override
public StreamObserver invoke(StreamObserver responseObserver) {
ServerCallStreamObserver serverCallObserver =
(ServerCallStreamObserver) responseObserver;
- callObserver.set(serverCallObserver);
serverCallObserver.setOnFinishHandler(new Runnable() {
@Override
public void run() {
@@ -227,14 +225,12 @@ public void run() {
@Test
public void onFinishHandlerCalledIfSetInUnaryClientCall() throws Exception {
final AtomicBoolean onFinish = new AtomicBoolean();
- final AtomicReference> callObserver = new AtomicReference<>();
ServerCallHandler callHandler = ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod() {
@Override
public void invoke(Integer request, StreamObserver responseObserver) {
ServerCallStreamObserver serverCallObserver =
(ServerCallStreamObserver) responseObserver;
- callObserver.set(serverCallObserver);
serverCallObserver.setOnFinishHandler(new Runnable() {
@Override
public void run() {
From 3429e67fe229fe6a613eef9fea2b86042622d24b Mon Sep 17 00:00:00 2001
From: Piotr Morgwai Kotarbinski
Date: Thu, 2 Sep 2021 16:23:09 +0700
Subject: [PATCH 07/11] stub: fix javadoc of setOnFinishHandler(...)
Fix some formatting and references.
---
.../java/io/grpc/stub/ServerCallStreamObserver.java | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
index c94ec16490a..7dae00961f7 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
@@ -154,14 +154,13 @@ public void disableAutoRequest() {
* Note however that the client still may have not received all the messages due to race
* conditions or crashes.
*
- *
- * Unless server exits abruptly, if both {@code onFinishHandler} and {@code onCancelHandler} are
- * set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)} exactly
- * 1 of {@code onFinishHandler} and {@code onCancelHandler} will be called eventually.
+ * Unless server exits abruptly, if both {@code onFinishHandler} and {@code onCancelHandler}
+ * are set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)}
+ * exactly 1 of {@code onFinishHandler} and {@code onCancelHandler} will be called eventually.
*
- * It is guaranteed that execution of the {@link Runnable} is serialized with calls to the
- * 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other
- * callbacks are running.
+ * It is guaranteed that execution of the {@code onFinishHandler} is serialized with calls to
+ * the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if
+ * other callbacks are running.
*
* This method may only be called during the initial call to the application, before the
* service returns its {@code StreamObserver}.
From b733f209d60abb3bc13664958519592321114093 Mon Sep 17 00:00:00 2001
From: Piotr Morgwai Kotarbinski
Date: Fri, 3 Sep 2021 13:13:28 +0700
Subject: [PATCH 08/11] stub: rename onFinishHandler to onCloseHandler
---
.../grpc/stub/ServerCallStreamObserver.java | 14 +++++------
.../main/java/io/grpc/stub/ServerCalls.java | 16 ++++++-------
.../java/io/grpc/stub/ServerCallsTest.java | 24 +++++++++----------
3 files changed, 27 insertions(+), 27 deletions(-)
diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
index 7dae00961f7..77ebef840d5 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
@@ -147,26 +147,26 @@ public void disableAutoRequest() {
public abstract void setMessageCompression(boolean enable);
/**
- * Sets a {@link Runnable} to be executed when the call is correctly finished from the server's
+ * Sets a {@link Runnable} to be executed when the call is closed correctly from the server's
* point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called,
* all the messages and trailing metadata have been put on the wire and the stream has been
* closed.
* Note however that the client still may have not received all the messages due to race
* conditions or crashes.
*
- * Unless server exits abruptly, if both {@code onFinishHandler} and {@code onCancelHandler}
+ *
Unless server exits abruptly, if both {@code onCloseHandler} and {@code onCancelHandler}
* are set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)}
- * exactly 1 of {@code onFinishHandler} and {@code onCancelHandler} will be called eventually.
+ * exactly 1 of {@code onCloseHandler} and {@code onCancelHandler} will be called eventually.
*
- * It is guaranteed that execution of the {@code onFinishHandler} is serialized with calls to
+ *
It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to
* the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if
* other callbacks are running.
*
* This method may only be called during the initial call to the application, before the
- * service returns its {@code StreamObserver}.
+ * service returns its {@link StreamObserver request observer}.
*
- * @param onFinishHandler to call when the call has been correctly finished.
+ * @param onCloseHandler to execute when the call has been closed correctly.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467")
- public abstract void setOnFinishHandler(Runnable onFinishHandler);
+ public abstract void setOnCloseHandler(Runnable onCloseHandler);
}
diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java
index 5fb8a8d7f2c..89e738adab1 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java
@@ -209,8 +209,8 @@ public void onReady() {
@Override
public void onComplete() {
- if (responseObserver.onFinishHandler != null) {
- responseObserver.onFinishHandler.run();
+ if (responseObserver.onCloseHandler != null) {
+ responseObserver.onCloseHandler.run();
}
}
}
@@ -301,8 +301,8 @@ public void onReady() {
@Override
public void onComplete() {
- if (responseObserver.onFinishHandler != null) {
- responseObserver.onFinishHandler.run();
+ if (responseObserver.onCloseHandler != null) {
+ responseObserver.onCloseHandler.run();
}
}
}
@@ -334,7 +334,7 @@ private static final class ServerCallStreamObserverImpl
private Runnable onCancelHandler;
private boolean aborted = false;
private boolean completed = false;
- private Runnable onFinishHandler;
+ private Runnable onCloseHandler;
// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) {
@@ -440,11 +440,11 @@ public void request(int count) {
}
@Override
- public void setOnFinishHandler(Runnable onFinishHandler) {
- checkState(!frozen, "Cannot alter onFinishHandler after initialization. May only be called "
+ public void setOnCloseHandler(Runnable onCloseHandler) {
+ checkState(!frozen, "Cannot alter onCloseHandler after initialization. May only be called "
+ "during the initial call to the application, before the service returns its "
+ "StreamObserver");
- this.onFinishHandler = onFinishHandler;
+ this.onCloseHandler = onCloseHandler;
}
}
diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
index 39f17979e8b..7227d26c5b8 100644
--- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
@@ -200,18 +200,18 @@ public StreamObserver invoke(StreamObserver responseObserver)
}
@Test
- public void onFinishHandlerCalledIfSetInStreamingClientCall() throws Exception {
- final AtomicBoolean onFinish = new AtomicBoolean();
+ public void onCloseHandlerCalledIfSetInStreamingClientCall() throws Exception {
+ final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean();
ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod() {
@Override
public StreamObserver invoke(StreamObserver responseObserver) {
ServerCallStreamObserver serverCallObserver =
(ServerCallStreamObserver) responseObserver;
- serverCallObserver.setOnFinishHandler(new Runnable() {
+ serverCallObserver.setOnCloseHandler(new Runnable() {
@Override
public void run() {
- onFinish.set(true);
+ onCloseHandlerCalled.set(true);
}
});
return new ServerCalls.NoopStreamObserver<>();
@@ -219,22 +219,22 @@ public void run() {
});
ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata());
callListener.onComplete();
- assertTrue(onFinish.get());
+ assertTrue(onCloseHandlerCalled.get());
}
@Test
- public void onFinishHandlerCalledIfSetInUnaryClientCall() throws Exception {
- final AtomicBoolean onFinish = new AtomicBoolean();
+ public void onCloseHandlerCalledIfSetInUnaryClientCall() throws Exception {
+ final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean();
ServerCallHandler callHandler = ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod() {
@Override
public void invoke(Integer request, StreamObserver responseObserver) {
ServerCallStreamObserver serverCallObserver =
(ServerCallStreamObserver) responseObserver;
- serverCallObserver.setOnFinishHandler(new Runnable() {
+ serverCallObserver.setOnCloseHandler(new Runnable() {
@Override
public void run() {
- onFinish.set(true);
+ onCloseHandlerCalled.set(true);
}
});
}
@@ -243,7 +243,7 @@ public void run() {
callListener.onMessage(0);
callListener.onHalfClose();
callListener.onComplete();
- assertTrue(onFinish.get());
+ assertTrue(onCloseHandlerCalled.get());
}
@Test
@@ -303,7 +303,7 @@ public void run() {
}
@Test
- public void cannotSetOnFinishHandlerAfterServiceInvocation() throws Exception {
+ public void cannotSetOnCloseHandlerAfterServiceInvocation() throws Exception {
final AtomicReference> callObserver = new AtomicReference<>();
ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod() {
@@ -316,7 +316,7 @@ public StreamObserver invoke(StreamObserver responseObserver)
ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
- callObserver.get().setOnFinishHandler(new Runnable() {
+ callObserver.get().setOnCloseHandler(new Runnable() {
@Override
public void run() {
}
From 9720b63eb9985710be3ed64f3938a5a289bd689d Mon Sep 17 00:00:00 2001
From: Piotr Morgwai Kotarbinski
Date: Sat, 4 Sep 2021 14:48:25 +0700
Subject: [PATCH 09/11] stub: apply review comments to setOnCloseHandler()
javadoc
---
.../main/java/io/grpc/stub/ServerCallStreamObserver.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
index 77ebef840d5..ae5c65877e3 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
@@ -147,12 +147,11 @@ public void disableAutoRequest() {
public abstract void setMessageCompression(boolean enable);
/**
- * Sets a {@link Runnable} to be executed when the call is closed correctly from the server's
+ * Sets a {@link Runnable} to be executed when the call is closed cleanly from the server's
* point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called,
* all the messages and trailing metadata have been put on the wire and the stream has been
- * closed.
- * Note however that the client still may have not received all the messages due to race
- * conditions or crashes.
+ * closed. Note however that the client still may have not received all the messages due to
+ * network delay, client crashes, and cancellation races.
*
* Unless server exits abruptly, if both {@code onCloseHandler} and {@code onCancelHandler}
* are set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)}
@@ -165,7 +164,7 @@ public void disableAutoRequest() {
*
This method may only be called during the initial call to the application, before the
* service returns its {@link StreamObserver request observer}.
*
- * @param onCloseHandler to execute when the call has been closed correctly.
+ * @param onCloseHandler to execute when the call has been closed cleanly.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467")
public abstract void setOnCloseHandler(Runnable onCloseHandler);
From 597c8aee1884b79a1d56e6fa4c134c95bfdfc871 Mon Sep 17 00:00:00 2001
From: Piotr Morgwai Kotarbinski
Date: Sat, 4 Sep 2021 14:58:17 +0700
Subject: [PATCH 10/11] stub: apply review comments to setOnCloseHandler()
javadoc, part 2
---
.../src/main/java/io/grpc/stub/ServerCallStreamObserver.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
index ae5c65877e3..0178e62a35a 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
@@ -153,9 +153,8 @@ public void disableAutoRequest() {
* closed. Note however that the client still may have not received all the messages due to
* network delay, client crashes, and cancellation races.
*
- * Unless server exits abruptly, if both {@code onCloseHandler} and {@code onCancelHandler}
- * are set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)}
- * exactly 1 of {@code onCloseHandler} and {@code onCancelHandler} will be called eventually.
+ * Exactly one of {@code onCloseHandler} and {@code onCancelHandler} is guaranteed to be called
+ * when the RPC terminates.
*
* It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to
* the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if
From dd2bb58cb6586da984d250ecc019aa8ff2faf85c Mon Sep 17 00:00:00 2001
From: Piotr Morgwai Kotarbinski
Date: Sat, 4 Sep 2021 15:55:46 +0700
Subject: [PATCH 11/11] stub: apply review comments to setOnCloseHandler()
javadoc, part 3
---
.../main/java/io/grpc/stub/ServerCallStreamObserver.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
index 0178e62a35a..18255ddd6d4 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
@@ -149,9 +149,9 @@ public void disableAutoRequest() {
/**
* Sets a {@link Runnable} to be executed when the call is closed cleanly from the server's
* point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called,
- * all the messages and trailing metadata have been put on the wire and the stream has been
- * closed. Note however that the client still may have not received all the messages due to
- * network delay, client crashes, and cancellation races.
+ * all the messages and trailing metadata have been sent and the stream has been closed. Note
+ * however that the client still may have not received all the messages due to network delay,
+ * client crashes, and cancellation races.
*
* Exactly one of {@code onCloseHandler} and {@code onCancelHandler} is guaranteed to be called
* when the RPC terminates.