Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disableAutoRequest method that disables all automatic inbound flow-control requests #6807

Merged
merged 24 commits into from May 6, 2020
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
87bb38a
Update ClientCalls to disable initial flow-control
DRayX Mar 5, 2020
86752b0
Update stub/src/main/java/io/grpc/stub/ClientCalls.java
DRayX Mar 5, 2020
7029ad7
Add new disableAutoRequest method
DRayX Mar 9, 2020
4fd1176
Add @Deprecated annotation to overrides
DRayX Mar 9, 2020
2a03647
Update ClientResponseObserver comment
DRayX Mar 9, 2020
07bb082
Update Manual Flow Control examples
DRayX Mar 9, 2020
1e8181a
Update ProtoReflectionService to use disableAutoRequest
DRayX Mar 9, 2020
24d6763
Fixed mistake in comment
DRayX Mar 9, 2020
c7083ae
Fix flowControlOnCompleteWithPendingRequest test
DRayX Mar 9, 2020
2014d06
Update ManualFlowControlClient example
DRayX Mar 10, 2020
4f90e0d
Update {Client|Server}CallsTest
DRayX Mar 10, 2020
80fc740
Update ClientCalls.java
DRayX Mar 10, 2020
c149edc
Update CallStreamObserver.java
DRayX Mar 10, 2020
effb394
Add a note about migrating from disableAutoInboundFlowControl to disa…
DRayX Mar 10, 2020
b88f02f
Update CallStreamObserver.java
DRayX Mar 10, 2020
4285b05
Update CallStreamObserver.java
DRayX Mar 11, 2020
d2b25e6
Switch to disableAutoRequestWithInitial
DRayX Apr 4, 2020
14ce757
Update ClientCalls.java
DRayX Apr 4, 2020
b83f94f
Update ClientCalls.java
DRayX Apr 4, 2020
1839faf
Fix more typos
DRayX Apr 4, 2020
14d2276
Update ServerCalls.java
DRayX Apr 4, 2020
da1d109
fix bug causing hanging servers
ejona86 May 4, 2020
371bb88
Remove `@Deprecated` from disableAutoInboundFlowControl
DRayX May 5, 2020
23d5b17
Fix style
DRayX May 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 43 additions & 24 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Expand Up @@ -162,7 +162,7 @@ public static <ReqT, RespT> RespT blockingUnaryCall(
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
ClientCall<ReqT, RespT> call, ReqT req) {
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
asyncUnaryRequestCall(call, req, result.listener(), true);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}

Expand All @@ -179,7 +179,7 @@ public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor);
asyncUnaryRequestCall(call, req, result.listener(), true);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}

Expand All @@ -193,7 +193,7 @@ public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
ClientCall<ReqT, RespT> call, ReqT req) {
GrpcFuture<RespT> responseFuture = new GrpcFuture<>(call);
asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture), false);
asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture));
return responseFuture;
}

Expand Down Expand Up @@ -275,16 +275,14 @@ private static <ReqT, RespT> void asyncUnaryRequestCall(
new StreamObserverToCallListenerAdapter<>(
responseObserver,
new CallToStreamObserverAdapter<>(call),
streamingResponse),
streamingResponse);
streamingResponse));
}

private static <ReqT, RespT> void asyncUnaryRequestCall(
ClientCall<ReqT, RespT> call,
ReqT req,
ClientCall.Listener<RespT> responseListener,
boolean streamingResponse) {
startCall(call, responseListener, streamingResponse);
StartableListener<RespT> responseListener) {
startCall(call, responseListener);
try {
call.sendMessage(req);
call.halfClose();
Expand All @@ -303,23 +301,19 @@ private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
startCall(
call,
new StreamObserverToCallListenerAdapter<>(
responseObserver, adapter, streamingResponse),
streamingResponse);
responseObserver, adapter, streamingResponse));
return adapter;
}

private static <ReqT, RespT> void startCall(
ClientCall<ReqT, RespT> call,
ClientCall.Listener<RespT> responseListener,
boolean streamingResponse) {
StartableListener<RespT> responseListener) {
call.start(responseListener, new Metadata());
if (streamingResponse) {
call.request(1);
} else {
// Initially ask for two responses from flow-control so that if a misbehaving server sends
// more than one responses, we can catch it and fail it in the listener.
call.request(2);
}
responseListener.onStart();
}

private abstract static class StartableListener<T> extends ClientCall.Listener<T> {
abstract void onStart();
}

private static final class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> {
Expand Down Expand Up @@ -398,7 +392,7 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
}

private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
extends ClientCall.Listener<RespT> {
extends StartableListener<RespT> {
private final StreamObserver<RespT> observer;
private final CallToStreamObserverAdapter<ReqT> adapter;
private final boolean streamingResponse;
Expand Down Expand Up @@ -456,12 +450,23 @@ public void onReady() {
adapter.onReadyHandler.run();
}
}

@Override
void onStart() {
if (adapter.autoFlowControlEnabled) {
if (streamingResponse) {
adapter.request(1);
} else {
adapter.request(2);
}
}
}
}

/**
* Completes a {@link GrpcFuture} using {@link StreamObserver} events.
*/
private static final class UnaryStreamToFuture<RespT> extends ClientCall.Listener<RespT> {
private static final class UnaryStreamToFuture<ReqT, RespT> extends StartableListener<RespT> {
DRayX marked this conversation as resolved.
Show resolved Hide resolved
private final GrpcFuture<RespT> responseFuture;
private RespT value;

Expand Down Expand Up @@ -497,6 +502,11 @@ public void onClose(Status status, Metadata trailers) {
responseFuture.setException(status.asRuntimeException(trailers));
}
}

@Override
void onStart() {
responseFuture.request(2);
}
}

private static final class GrpcFuture<RespT> extends AbstractFuture<RespT> {
Expand Down Expand Up @@ -526,6 +536,10 @@ protected boolean setException(Throwable throwable) {
protected String pendingToString() {
return MoreObjects.toStringHelper(this).add("clientCall", call).toString();
}

void request(int numMessages) {
call.request(numMessages);
}
}

/**
Expand All @@ -538,7 +552,7 @@ protected String pendingToString() {
private static final class BlockingResponseStream<T> implements Iterator<T> {
// Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close.
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(2);
private final ClientCall.Listener<T> listener = new QueuingListener();
private final StartableListener<T> listener = new QueuingListener();
private final ClientCall<?, T> call;
/** May be null. */
private final ThreadlessExecutor threadless;
Expand All @@ -556,7 +570,7 @@ private static final class BlockingResponseStream<T> implements Iterator<T> {
this.threadless = threadless;
}

ClientCall.Listener<T> listener() {
StartableListener<T> listener() {
return listener;
}

Expand Down Expand Up @@ -628,7 +642,7 @@ public void remove() {
throw new UnsupportedOperationException();
}

private final class QueuingListener extends ClientCall.Listener<T> {
private final class QueuingListener extends StartableListener<T> {
// Non private to avoid synthetic class
QueuingListener() {}

Expand All @@ -654,6 +668,11 @@ public void onClose(Status status, Metadata trailers) {
}
done = true;
}

@Override
void onStart() {
call.request(1);
}
}
}

Expand Down