From b68d40329c45f646ea729d0d79cf1a0bdde80ca7 Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Thu, 26 Aug 2021 17:35:50 +0700 Subject: [PATCH] stub: add ServerCallStreamObserver.setOnSuccessHandler(...) (#5895) This allows for user code to be notified when the messages are actually put on the wire and the stream is closed. --- .../grpc/stub/ServerCallStreamObserver.java | 17 ++++++ .../main/java/io/grpc/stub/ServerCalls.java | 23 ++++++++ .../java/io/grpc/stub/ServerCallsTest.java | 56 +++++++++++++++++++ 3 files changed, 96 insertions(+) diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index 3ba1bf563ef..8b19335f526 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -143,4 +143,21 @@ public void disableAutoRequest() { */ @Override public abstract void setMessageCompression(boolean enable); + + /** + * Sets a {@link Runnable} to be called when the call is successfully completed from the server's + * point of view: all the messages have been put on the wire and the stream has been closed. + * Note however that the client still may have not received all the messages due to race + * conditions or crashes. + * + *

It is guaranteed that execution of the {@link Runnable} is serialized with calls to the + * 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other + * callbacks are running.

+ * + *

This method may only be called during the initial call to the application, before the + * service returns its {@code StreamObserver}.

+ * + * @param onSuccessHandler to call when the call has been successfully completed + */ + public abstract void setOnSuccessHandler(Runnable onSuccessHandler); } diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index ba08139b716..1d96d06078f 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -206,6 +206,13 @@ public void onReady() { responseObserver.onReadyHandler.run(); } } + + @Override + public void onComplete() { + if (responseObserver.onSuccessHandler != null) { + responseObserver.onSuccessHandler.run(); + } + } } } @@ -291,6 +298,13 @@ public void onReady() { responseObserver.onReadyHandler.run(); } } + + @Override + public void onComplete() { + if (responseObserver.onSuccessHandler != null) { + responseObserver.onSuccessHandler.run(); + } + } } } @@ -320,6 +334,7 @@ private static final class ServerCallStreamObserverImpl private Runnable onCancelHandler; private boolean aborted = false; private boolean completed = false; + private Runnable onSuccessHandler; // Non private to avoid synthetic class ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) { @@ -423,6 +438,14 @@ public void disableAutoRequest() { public void request(int count) { call.request(count); } + + @Override + public void setOnSuccessHandler(Runnable onSuccessHandler) { + checkState(!frozen, "Cannot alter onSuccessHandler after initialization. May only be called " + + "during the initial call to the application, before the service returns its " + + "StreamObserver"); + this.onSuccessHandler = onSuccessHandler; + } } /** diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index a2a1ef93961..8ff7fbc43ab 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -199,6 +199,34 @@ public StreamObserver invoke(StreamObserver responseObserver) callObserver.get().onCompleted(); } + @Test + public void onSuccessHandlerCalledIfSet() throws Exception { + final AtomicBoolean onSuccess = new AtomicBoolean(); + final AtomicReference> callObserver = + new AtomicReference<>(); + ServerCallHandler callHandler = + ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + callObserver.set(serverCallObserver); + serverCallObserver.setOnSuccessHandler(new Runnable() { + @Override + public void run() { + onSuccess.set(true); + } + }); + return new ServerCalls.NoopStreamObserver<>(); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(serverCall, new Metadata()); + callListener.onComplete(); + assertTrue(onSuccess.get()); + } + @Test public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception { final AtomicReference> callObserver = @@ -255,6 +283,34 @@ public void run() { } } + @Test + public void cannotSetOnSuccessHandlerAfterServiceInvocation() throws Exception { + final AtomicReference> callObserver = + new AtomicReference<>(); + ServerCallHandler callHandler = + ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + callObserver.set((ServerCallStreamObserver) responseObserver); + return new ServerCalls.NoopStreamObserver<>(); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(serverCall, new Metadata()); + callListener.onMessage(1); + try { + callObserver.get().setOnSuccessHandler(new Runnable() { + @Override + public void run() { + } + }); + fail("Cannot set onReady after service invocation"); + } catch (IllegalStateException expected) { + // Expected + } + } + @Test public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception { final AtomicReference> callObserver =