Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stub: add ServerCallStreamObserver.setOnCloseHandler(...) (#5895) #8452

Merged
merged 11 commits into from Sep 21, 2021
Merged
17 changes: 17 additions & 0 deletions stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
Expand Up @@ -143,4 +143,21 @@ public void disableAutoRequest() {
*/
@Override
public abstract void setMessageCompression(boolean enable);

/**
* Sets a {@link Runnable} to be called when the call is successfully completed from the server's
* point of view: all the messages have been put on the wire and the stream has been closed.
morgwai marked this conversation as resolved.
Show resolved Hide resolved
* Note however that the client still may have not received all the messages due to race
morgwai marked this conversation as resolved.
Show resolved Hide resolved
* conditions or crashes.
*
* <p>It is guaranteed that execution of the {@link Runnable} is serialized with calls to the
* 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other
* callbacks are running.</p>
*
* <p>This method may only be called during the initial call to the application, before the
* service returns its {@code StreamObserver}.</p>
*
* @param onSuccessHandler to call when the call has been successfully completed
*/
public abstract void setOnSuccessHandler(Runnable onSuccessHandler);
morgwai marked this conversation as resolved.
Show resolved Hide resolved
}
23 changes: 23 additions & 0 deletions stub/src/main/java/io/grpc/stub/ServerCalls.java
Expand Up @@ -206,6 +206,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}

@Override
public void onComplete() {
if (responseObserver.onSuccessHandler != null) {
responseObserver.onSuccessHandler.run();
}
}
}
}

Expand Down Expand Up @@ -291,6 +298,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}

@Override
public void onComplete() {
if (responseObserver.onSuccessHandler != null) {
responseObserver.onSuccessHandler.run();
}
}
}
}

Expand Down Expand Up @@ -320,6 +334,7 @@ private static final class ServerCallStreamObserverImpl<ReqT, RespT>
private Runnable onCancelHandler;
private boolean aborted = false;
private boolean completed = false;
private Runnable onSuccessHandler;

// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi) {
Expand Down Expand Up @@ -423,6 +438,14 @@ public void disableAutoRequest() {
public void request(int count) {
call.request(count);
}

@Override
public void setOnSuccessHandler(Runnable onSuccessHandler) {
checkState(!frozen, "Cannot alter onSuccessHandler after initialization. May only be called "
+ "during the initial call to the application, before the service returns its "
+ "StreamObserver");
this.onSuccessHandler = onSuccessHandler;
}
}

/**
Expand Down
56 changes: 56 additions & 0 deletions stub/src/test/java/io/grpc/stub/ServerCallsTest.java
Expand Up @@ -199,6 +199,34 @@ public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver)
callObserver.get().onCompleted();
}

@Test
public void onSuccessHandlerCalledIfSet() throws Exception {
final AtomicBoolean onSuccess = new AtomicBoolean();
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
callObserver.set(serverCallObserver);
serverCallObserver.setOnSuccessHandler(new Runnable() {
@Override
public void run() {
onSuccess.set(true);
}
});
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onComplete();
assertTrue(onSuccess.get());
}

@Test
public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
Expand Down Expand Up @@ -255,6 +283,34 @@ public void run() {
}
}

@Test
public void cannotSetOnSuccessHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().setOnSuccessHandler(new Runnable() {
@Override
public void run() {
}
});
fail("Cannot set onReady after service invocation");
} catch (IllegalStateException expected) {
// Expected
}
}

@Test
public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
Expand Down