diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
index 3ba1bf563ef..8b19335f526 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
@@ -143,4 +143,21 @@ public void disableAutoRequest() {
*/
@Override
public abstract void setMessageCompression(boolean enable);
+
+ /**
+ * Sets a {@link Runnable} to be called when the call is successfully completed from the server's
+ * point of view: all the messages have been put on the wire and the stream has been closed.
+ * Note however that the client still may have not received all the messages due to race
+ * conditions or crashes.
+ *
+ *
It is guaranteed that execution of the {@link Runnable} is serialized with calls to the
+ * 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other
+ * callbacks are running.
+ *
+ * This method may only be called during the initial call to the application, before the
+ * service returns its {@code StreamObserver}.
+ *
+ * @param onSuccessHandler to call when the call has been successfully completed
+ */
+ public abstract void setOnSuccessHandler(Runnable onSuccessHandler);
}
diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java
index ba08139b716..1d96d06078f 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java
@@ -206,6 +206,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}
+
+ @Override
+ public void onComplete() {
+ if (responseObserver.onSuccessHandler != null) {
+ responseObserver.onSuccessHandler.run();
+ }
+ }
}
}
@@ -291,6 +298,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}
+
+ @Override
+ public void onComplete() {
+ if (responseObserver.onSuccessHandler != null) {
+ responseObserver.onSuccessHandler.run();
+ }
+ }
}
}
@@ -320,6 +334,7 @@ private static final class ServerCallStreamObserverImpl
private Runnable onCancelHandler;
private boolean aborted = false;
private boolean completed = false;
+ private Runnable onSuccessHandler;
// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) {
@@ -423,6 +438,14 @@ public void disableAutoRequest() {
public void request(int count) {
call.request(count);
}
+
+ @Override
+ public void setOnSuccessHandler(Runnable onSuccessHandler) {
+ checkState(!frozen, "Cannot alter onSuccessHandler after initialization. May only be called "
+ + "during the initial call to the application, before the service returns its "
+ + "StreamObserver");
+ this.onSuccessHandler = onSuccessHandler;
+ }
}
/**
diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
index a2a1ef93961..8ff7fbc43ab 100644
--- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
@@ -199,6 +199,34 @@ public StreamObserver invoke(StreamObserver responseObserver)
callObserver.get().onCompleted();
}
+ @Test
+ public void onSuccessHandlerCalledIfSet() throws Exception {
+ final AtomicBoolean onSuccess = new AtomicBoolean();
+ final AtomicReference> callObserver =
+ new AtomicReference<>();
+ ServerCallHandler callHandler =
+ ServerCalls.asyncBidiStreamingCall(
+ new ServerCalls.BidiStreamingMethod() {
+ @Override
+ public StreamObserver invoke(StreamObserver responseObserver) {
+ ServerCallStreamObserver serverCallObserver =
+ (ServerCallStreamObserver) responseObserver;
+ callObserver.set(serverCallObserver);
+ serverCallObserver.setOnSuccessHandler(new Runnable() {
+ @Override
+ public void run() {
+ onSuccess.set(true);
+ }
+ });
+ return new ServerCalls.NoopStreamObserver<>();
+ }
+ });
+ ServerCall.Listener callListener =
+ callHandler.startCall(serverCall, new Metadata());
+ callListener.onComplete();
+ assertTrue(onSuccess.get());
+ }
+
@Test
public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception {
final AtomicReference> callObserver =
@@ -255,6 +283,34 @@ public void run() {
}
}
+ @Test
+ public void cannotSetOnSuccessHandlerAfterServiceInvocation() throws Exception {
+ final AtomicReference> callObserver =
+ new AtomicReference<>();
+ ServerCallHandler callHandler =
+ ServerCalls.asyncBidiStreamingCall(
+ new ServerCalls.BidiStreamingMethod() {
+ @Override
+ public StreamObserver invoke(StreamObserver responseObserver) {
+ callObserver.set((ServerCallStreamObserver) responseObserver);
+ return new ServerCalls.NoopStreamObserver<>();
+ }
+ });
+ ServerCall.Listener callListener =
+ callHandler.startCall(serverCall, new Metadata());
+ callListener.onMessage(1);
+ try {
+ callObserver.get().setOnSuccessHandler(new Runnable() {
+ @Override
+ public void run() {
+ }
+ });
+ fail("Cannot set onReady after service invocation");
+ } catch (IllegalStateException expected) {
+ // Expected
+ }
+ }
+
@Test
public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
final AtomicReference> callObserver =