Skip to content

Commit

Permalink
Notify blocking stub executor on message. (line#2066)
Browse files Browse the repository at this point in the history
I had no idea blocking stubs supported querying server streaming endpoints like this. Turns out we need to notify a call's executor when messages are received so a blocking stub can read individual messages, not just the final one as we currently support.

Fixes line#2065
  • Loading branch information
anuraaga authored and trustin committed Sep 16, 2019
1 parent f8e572f commit bec8eff
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ public void messageRead(ByteBufOrStream message) {
req.close(GrpcStatus.fromThrowable(t).asException());
throw t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);
}

notifyExecutor();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -357,6 +358,45 @@ public void serverStreaming() throws Exception {
});
}

@Test
public void serverStreaming_blocking() throws Exception {
final StreamingOutputCallRequest request =
StreamingOutputCallRequest.newBuilder()
.setResponseType(COMPRESSABLE)
.addResponseParameters(
ResponseParameters.newBuilder()
.setSize(31415)
.setIntervalUs(1000))
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(9)
.setIntervalUs(1000))
.build();
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[31415])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[9])))
.build());

final List<StreamingOutputCallResponse> responses = new ArrayList<>();
final Iterator<StreamingOutputCallResponse> it = blockingStub.streamingOutputCall(request);
while (it.hasNext()) {
responses.add(it.next());
}

assertThat(responses).containsExactlyElementsOf(goldenResponses);

checkRequestLog((rpcReq, rpcRes, grpcStatus) -> {
assertThat(rpcReq.params()).containsExactly(request);
assertThat(rpcRes.get()).isEqualTo(goldenResponses.get(0));
});
}

@Test
public void clientStreaming() throws Exception {
final List<StreamingInputCallRequest> requests = Arrays.asList(
Expand Down

0 comments on commit bec8eff

Please sign in to comment.