Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stub: add ServerCallStreamObserver.setOnCloseHandler(...) (#5895) #8452

Merged
merged 11 commits into from Sep 21, 2021
Merged
26 changes: 26 additions & 0 deletions stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
Expand Up @@ -16,6 +16,8 @@

package io.grpc.stub;

import io.grpc.ExperimentalApi;

/**
* A refinement of {@link CallStreamObserver} to allows for interaction with call
* cancellation events on the server side.
Expand Down Expand Up @@ -143,4 +145,28 @@ public void disableAutoRequest() {
*/
@Override
public abstract void setMessageCompression(boolean enable);

/**
* Sets a {@link Runnable} to be executed when the call is closed correctly from the server's
morgwai marked this conversation as resolved.
Show resolved Hide resolved
* point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called,
* all the messages and trailing metadata have been put on the wire and the stream has been
morgwai marked this conversation as resolved.
Show resolved Hide resolved
* closed.
* Note however that the client still may have not received all the messages due to race
morgwai marked this conversation as resolved.
Show resolved Hide resolved
* conditions or crashes.
*
* <p>Unless server exits abruptly, if both {@code onCloseHandler} and {@code onCancelHandler}
morgwai marked this conversation as resolved.
Show resolved Hide resolved
* are set, then after a call to either {@link #onCompleted()} or {@link #onError(Throwable)}
* exactly 1 of {@code onCloseHandler} and {@code onCancelHandler} will be called eventually.</p>
morgwai marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to
* the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if
* other callbacks are running.</p>
*
* <p>This method may only be called during the initial call to the application, before the
* service returns its {@link StreamObserver request observer}.</p>
*
* @param onCloseHandler to execute when the call has been closed correctly.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467")
public abstract void setOnCloseHandler(Runnable onCloseHandler);
}
23 changes: 23 additions & 0 deletions stub/src/main/java/io/grpc/stub/ServerCalls.java
Expand Up @@ -206,6 +206,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}

@Override
public void onComplete() {
if (responseObserver.onCloseHandler != null) {
responseObserver.onCloseHandler.run();
}
}
}
}

Expand Down Expand Up @@ -291,6 +298,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}

@Override
public void onComplete() {
if (responseObserver.onCloseHandler != null) {
responseObserver.onCloseHandler.run();
}
}
}
}

Expand Down Expand Up @@ -320,6 +334,7 @@ private static final class ServerCallStreamObserverImpl<ReqT, RespT>
private Runnable onCancelHandler;
private boolean aborted = false;
private boolean completed = false;
private Runnable onCloseHandler;

// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi) {
Expand Down Expand Up @@ -423,6 +438,14 @@ public void disableAutoRequest() {
public void request(int count) {
call.request(count);
}

@Override
public void setOnCloseHandler(Runnable onCloseHandler) {
checkState(!frozen, "Cannot alter onCloseHandler after initialization. May only be called "
+ "during the initial call to the application, before the service returns its "
+ "StreamObserver");
this.onCloseHandler = onCloseHandler;
}
}

/**
Expand Down
72 changes: 72 additions & 0 deletions stub/src/test/java/io/grpc/stub/ServerCallsTest.java
Expand Up @@ -199,6 +199,53 @@ public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver)
callObserver.get().onCompleted();
}

@Test
public void onCloseHandlerCalledIfSetInStreamingClientCall() throws Exception {
final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean();
ServerCallHandler<Integer, Integer> callHandler = ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.setOnCloseHandler(new Runnable() {
@Override
public void run() {
onCloseHandlerCalled.set(true);
}
});
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener = callHandler.startCall(serverCall, new Metadata());
callListener.onComplete();
assertTrue(onCloseHandlerCalled.get());
}

@Test
public void onCloseHandlerCalledIfSetInUnaryClientCall() throws Exception {
final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean();
ServerCallHandler<Integer, Integer> callHandler = ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod<Integer, Integer>() {
@Override
public void invoke(Integer request, StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.setOnCloseHandler(new Runnable() {
@Override
public void run() {
onCloseHandlerCalled.set(true);
}
});
}
});
ServerCall.Listener<Integer> callListener = callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(0);
callListener.onHalfClose();
callListener.onComplete();
assertTrue(onCloseHandlerCalled.get());
}

@Test
public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
Expand Down Expand Up @@ -255,6 +302,31 @@ public void run() {
}
}

@Test
public void cannotSetOnCloseHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver = new AtomicReference<>();
ServerCallHandler<Integer, Integer> callHandler = ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener = callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().setOnCloseHandler(new Runnable() {
@Override
public void run() {
}
});
fail("Cannot set onReady after service invocation");
} catch (IllegalStateException expected) {
// Expected
}
}

@Test
public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
Expand Down