Skip to content

Commit

Permalink
stub: add ServerCallStreamObserver.setOnCloseHandler(...) (#8452)
Browse files Browse the repository at this point in the history
This allows for user code to be notified when the messages are actually
put on the wire and the stream is closed.

Fixes #5895
  • Loading branch information
morgwai committed Sep 21, 2021
1 parent 29d238a commit a6abb1b
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 0 deletions.
24 changes: 24 additions & 0 deletions stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
Expand Up @@ -16,6 +16,8 @@

package io.grpc.stub;

import io.grpc.ExperimentalApi;

/**
* A refinement of {@link CallStreamObserver} to allows for interaction with call
* cancellation events on the server side. An instance of this class is obtained by casting the
Expand Down Expand Up @@ -145,4 +147,26 @@ public void disableAutoRequest() {
*/
@Override
public abstract void setMessageCompression(boolean enable);

/**
* Sets a {@link Runnable} to be executed when the call is closed cleanly from the server's
* point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called,
* all the messages and trailing metadata have been sent and the stream has been closed. Note
* however that the client still may have not received all the messages due to network delay,
* client crashes, and cancellation races.
*
* <p>Exactly one of {@code onCloseHandler} and {@code onCancelHandler} is guaranteed to be called
* when the RPC terminates.</p>
*
* <p>It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to
* the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if
* other callbacks are running.</p>
*
* <p>This method may only be called during the initial call to the application, before the
* service returns its {@link StreamObserver request observer}.</p>
*
* @param onCloseHandler to execute when the call has been closed cleanly.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467")
public abstract void setOnCloseHandler(Runnable onCloseHandler);
}
23 changes: 23 additions & 0 deletions stub/src/main/java/io/grpc/stub/ServerCalls.java
Expand Up @@ -206,6 +206,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}

@Override
public void onComplete() {
if (responseObserver.onCloseHandler != null) {
responseObserver.onCloseHandler.run();
}
}
}
}

Expand Down Expand Up @@ -291,6 +298,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}

@Override
public void onComplete() {
if (responseObserver.onCloseHandler != null) {
responseObserver.onCloseHandler.run();
}
}
}
}

Expand Down Expand Up @@ -320,6 +334,7 @@ private static final class ServerCallStreamObserverImpl<ReqT, RespT>
private Runnable onCancelHandler;
private boolean aborted = false;
private boolean completed = false;
private Runnable onCloseHandler;

// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi) {
Expand Down Expand Up @@ -423,6 +438,14 @@ public void disableAutoRequest() {
public void request(int count) {
call.request(count);
}

@Override
public void setOnCloseHandler(Runnable onCloseHandler) {
checkState(!frozen, "Cannot alter onCloseHandler after initialization. May only be called "
+ "during the initial call to the application, before the service returns its "
+ "StreamObserver");
this.onCloseHandler = onCloseHandler;
}
}

/**
Expand Down
72 changes: 72 additions & 0 deletions stub/src/test/java/io/grpc/stub/ServerCallsTest.java
Expand Up @@ -199,6 +199,53 @@ public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver)
callObserver.get().onCompleted();
}

@Test
public void onCloseHandlerCalledIfSetInStreamingClientCall() throws Exception {
final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean();
ServerCallHandler<Integer, Integer> callHandler = ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.setOnCloseHandler(new Runnable() {
@Override
public void run() {
onCloseHandlerCalled.set(true);
}
});
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener = callHandler.startCall(serverCall, new Metadata());
callListener.onComplete();
assertTrue(onCloseHandlerCalled.get());
}

@Test
public void onCloseHandlerCalledIfSetInUnaryClientCall() throws Exception {
final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean();
ServerCallHandler<Integer, Integer> callHandler = ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod<Integer, Integer>() {
@Override
public void invoke(Integer request, StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.setOnCloseHandler(new Runnable() {
@Override
public void run() {
onCloseHandlerCalled.set(true);
}
});
}
});
ServerCall.Listener<Integer> callListener = callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(0);
callListener.onHalfClose();
callListener.onComplete();
assertTrue(onCloseHandlerCalled.get());
}

@Test
public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
Expand Down Expand Up @@ -255,6 +302,31 @@ public void run() {
}
}

@Test
public void cannotSetOnCloseHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver = new AtomicReference<>();
ServerCallHandler<Integer, Integer> callHandler = ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener = callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().setOnCloseHandler(new Runnable() {
@Override
public void run() {
}
});
fail("Cannot set onReady after service invocation");
} catch (IllegalStateException expected) {
// Expected
}
}

@Test
public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
Expand Down

0 comments on commit a6abb1b

Please sign in to comment.