From a6abb1b8d97b3dda354673a4e10fab6a1472fc0c Mon Sep 17 00:00:00 2001 From: Piotr Morgwai Kotarbinski Date: Wed, 22 Sep 2021 01:31:04 +0700 Subject: [PATCH] stub: add ServerCallStreamObserver.setOnCloseHandler(...) (#8452) This allows for user code to be notified when the messages are actually put on the wire and the stream is closed. Fixes #5895 --- .../grpc/stub/ServerCallStreamObserver.java | 24 +++++++ .../main/java/io/grpc/stub/ServerCalls.java | 23 ++++++ .../java/io/grpc/stub/ServerCallsTest.java | 72 +++++++++++++++++++ 3 files changed, 119 insertions(+) diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index 00d6f5d3c7c..e31cae1fad4 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -16,6 +16,8 @@ package io.grpc.stub; +import io.grpc.ExperimentalApi; + /** * A refinement of {@link CallStreamObserver} to allows for interaction with call * cancellation events on the server side. An instance of this class is obtained by casting the @@ -145,4 +147,26 @@ public void disableAutoRequest() { */ @Override public abstract void setMessageCompression(boolean enable); + + /** + * Sets a {@link Runnable} to be executed when the call is closed cleanly from the server's + * point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called, + * all the messages and trailing metadata have been sent and the stream has been closed. Note + * however that the client still may have not received all the messages due to network delay, + * client crashes, and cancellation races. + * + *

Exactly one of {@code onCloseHandler} and {@code onCancelHandler} is guaranteed to be called + * when the RPC terminates.

+ * + *

It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to + * the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if + * other callbacks are running.

+ * + *

This method may only be called during the initial call to the application, before the + * service returns its {@link StreamObserver request observer}.

+ * + * @param onCloseHandler to execute when the call has been closed cleanly. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467") + public abstract void setOnCloseHandler(Runnable onCloseHandler); } diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 0f7d6d09ab1..09f86d0364c 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -206,6 +206,13 @@ public void onReady() { responseObserver.onReadyHandler.run(); } } + + @Override + public void onComplete() { + if (responseObserver.onCloseHandler != null) { + responseObserver.onCloseHandler.run(); + } + } } } @@ -291,6 +298,13 @@ public void onReady() { responseObserver.onReadyHandler.run(); } } + + @Override + public void onComplete() { + if (responseObserver.onCloseHandler != null) { + responseObserver.onCloseHandler.run(); + } + } } } @@ -320,6 +334,7 @@ private static final class ServerCallStreamObserverImpl private Runnable onCancelHandler; private boolean aborted = false; private boolean completed = false; + private Runnable onCloseHandler; // Non private to avoid synthetic class ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) { @@ -423,6 +438,14 @@ public void disableAutoRequest() { public void request(int count) { call.request(count); } + + @Override + public void setOnCloseHandler(Runnable onCloseHandler) { + checkState(!frozen, "Cannot alter onCloseHandler after initialization. May only be called " + + "during the initial call to the application, before the service returns its " + + "StreamObserver"); + this.onCloseHandler = onCloseHandler; + } } /** diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index a2a1ef93961..7227d26c5b8 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -199,6 +199,53 @@ public StreamObserver invoke(StreamObserver responseObserver) callObserver.get().onCompleted(); } + @Test + public void onCloseHandlerCalledIfSetInStreamingClientCall() throws Exception { + final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean(); + ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + serverCallObserver.setOnCloseHandler(new Runnable() { + @Override + public void run() { + onCloseHandlerCalled.set(true); + } + }); + return new ServerCalls.NoopStreamObserver<>(); + } + }); + ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata()); + callListener.onComplete(); + assertTrue(onCloseHandlerCalled.get()); + } + + @Test + public void onCloseHandlerCalledIfSetInUnaryClientCall() throws Exception { + final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean(); + ServerCallHandler callHandler = ServerCalls.asyncServerStreamingCall( + new ServerCalls.ServerStreamingMethod() { + @Override + public void invoke(Integer request, StreamObserver responseObserver) { + ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + serverCallObserver.setOnCloseHandler(new Runnable() { + @Override + public void run() { + onCloseHandlerCalled.set(true); + } + }); + } + }); + ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata()); + callListener.onMessage(0); + callListener.onHalfClose(); + callListener.onComplete(); + assertTrue(onCloseHandlerCalled.get()); + } + @Test public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception { final AtomicReference> callObserver = @@ -255,6 +302,31 @@ public void run() { } } + @Test + public void cannotSetOnCloseHandlerAfterServiceInvocation() throws Exception { + final AtomicReference> callObserver = new AtomicReference<>(); + ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + callObserver.set((ServerCallStreamObserver) responseObserver); + return new ServerCalls.NoopStreamObserver<>(); + } + }); + ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata()); + callListener.onMessage(1); + try { + callObserver.get().setOnCloseHandler(new Runnable() { + @Override + public void run() { + } + }); + fail("Cannot set onReady after service invocation"); + } catch (IllegalStateException expected) { + // Expected + } + } + @Test public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception { final AtomicReference> callObserver =