Skip to content

Commit

Permalink
stub: add ServerCallStreamObserver.setOnSuccessHandler(...) (grpc#5895)
Browse files Browse the repository at this point in the history
This allows for user code to be notified when the messages are actually
put on the wire and the stream is closed.
  • Loading branch information
morgwai committed Aug 26, 2021
1 parent 3cb0696 commit b68d403
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 0 deletions.
17 changes: 17 additions & 0 deletions stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
Expand Up @@ -143,4 +143,21 @@ public void disableAutoRequest() {
*/
@Override
public abstract void setMessageCompression(boolean enable);

/**
* Sets a {@link Runnable} to be called when the call is successfully completed from the server's
* point of view: all the messages have been put on the wire and the stream has been closed.
* Note however that the client still may have not received all the messages due to race
* conditions or crashes.
*
* <p>It is guaranteed that execution of the {@link Runnable} is serialized with calls to the
* 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other
* callbacks are running.</p>
*
* <p>This method may only be called during the initial call to the application, before the
* service returns its {@code StreamObserver}.</p>
*
* @param onSuccessHandler to call when the call has been successfully completed
*/
public abstract void setOnSuccessHandler(Runnable onSuccessHandler);
}
23 changes: 23 additions & 0 deletions stub/src/main/java/io/grpc/stub/ServerCalls.java
Expand Up @@ -206,6 +206,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}

@Override
public void onComplete() {
if (responseObserver.onSuccessHandler != null) {
responseObserver.onSuccessHandler.run();
}
}
}
}

Expand Down Expand Up @@ -291,6 +298,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}

@Override
public void onComplete() {
if (responseObserver.onSuccessHandler != null) {
responseObserver.onSuccessHandler.run();
}
}
}
}

Expand Down Expand Up @@ -320,6 +334,7 @@ private static final class ServerCallStreamObserverImpl<ReqT, RespT>
private Runnable onCancelHandler;
private boolean aborted = false;
private boolean completed = false;
private Runnable onSuccessHandler;

// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi) {
Expand Down Expand Up @@ -423,6 +438,14 @@ public void disableAutoRequest() {
public void request(int count) {
call.request(count);
}

@Override
public void setOnSuccessHandler(Runnable onSuccessHandler) {
checkState(!frozen, "Cannot alter onSuccessHandler after initialization. May only be called "
+ "during the initial call to the application, before the service returns its "
+ "StreamObserver");
this.onSuccessHandler = onSuccessHandler;
}
}

/**
Expand Down
56 changes: 56 additions & 0 deletions stub/src/test/java/io/grpc/stub/ServerCallsTest.java
Expand Up @@ -199,6 +199,34 @@ public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver)
callObserver.get().onCompleted();
}

@Test
public void onSuccessHandlerCalledIfSet() throws Exception {
final AtomicBoolean onSuccess = new AtomicBoolean();
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
callObserver.set(serverCallObserver);
serverCallObserver.setOnSuccessHandler(new Runnable() {
@Override
public void run() {
onSuccess.set(true);
}
});
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onComplete();
assertTrue(onSuccess.get());
}

@Test
public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
Expand Down Expand Up @@ -255,6 +283,34 @@ public void run() {
}
}

@Test
public void cannotSetOnSuccessHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().setOnSuccessHandler(new Runnable() {
@Override
public void run() {
}
});
fail("Cannot set onReady after service invocation");
} catch (IllegalStateException expected) {
// Expected
}
}

@Test
public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
Expand Down

0 comments on commit b68d403

Please sign in to comment.