Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disableAutoRequest method that disables all automatic inbound flow-control requests #6807

Merged
merged 24 commits into from May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
87bb38a
Update ClientCalls to disable initial flow-control
DRayX Mar 5, 2020
86752b0
Update stub/src/main/java/io/grpc/stub/ClientCalls.java
DRayX Mar 5, 2020
7029ad7
Add new disableAutoRequest method
DRayX Mar 9, 2020
4fd1176
Add @Deprecated annotation to overrides
DRayX Mar 9, 2020
2a03647
Update ClientResponseObserver comment
DRayX Mar 9, 2020
07bb082
Update Manual Flow Control examples
DRayX Mar 9, 2020
1e8181a
Update ProtoReflectionService to use disableAutoRequest
DRayX Mar 9, 2020
24d6763
Fixed mistake in comment
DRayX Mar 9, 2020
c7083ae
Fix flowControlOnCompleteWithPendingRequest test
DRayX Mar 9, 2020
2014d06
Update ManualFlowControlClient example
DRayX Mar 10, 2020
4f90e0d
Update {Client|Server}CallsTest
DRayX Mar 10, 2020
80fc740
Update ClientCalls.java
DRayX Mar 10, 2020
c149edc
Update CallStreamObserver.java
DRayX Mar 10, 2020
effb394
Add a note about migrating from disableAutoInboundFlowControl to disa…
DRayX Mar 10, 2020
b88f02f
Update CallStreamObserver.java
DRayX Mar 10, 2020
4285b05
Update CallStreamObserver.java
DRayX Mar 11, 2020
d2b25e6
Switch to disableAutoRequestWithInitial
DRayX Apr 4, 2020
14ce757
Update ClientCalls.java
DRayX Apr 4, 2020
b83f94f
Update ClientCalls.java
DRayX Apr 4, 2020
1839faf
Fix more typos
DRayX Apr 4, 2020
14d2276
Update ServerCalls.java
DRayX Apr 4, 2020
da1d109
fix bug causing hanging servers
ejona86 May 4, 2020
371bb88
Remove `@Deprecated` from disableAutoInboundFlowControl
DRayX May 5, 2020
23d5b17
Fix style
DRayX May 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -54,7 +54,7 @@ public void beforeStart(final ClientCallStreamObserver<HelloRequest> requestStre
this.requestStream = requestStream;
// Set up manual flow control for the response stream. It feels backwards to configure the response
// stream's flow control using the request stream's observer, but this is the way it is.
requestStream.disableAutoInboundFlowControl();
requestStream.disableAutoRequestWithInitial(1);

// Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked
// when the consuming side has enough buffer space to receive more messages.
Expand Down
Expand Up @@ -39,7 +39,7 @@ public StreamObserver<HelloRequest> sayHelloStreaming(final StreamObserver<Hello
// stream's flow control using the response stream's observer, but this is the way it is.
final ServerCallStreamObserver<HelloReply> serverCallStreamObserver =
(ServerCallStreamObserver<HelloReply>) responseObserver;
serverCallStreamObserver.disableAutoInboundFlowControl();
serverCallStreamObserver.disableAutoRequestWithInitial(0);

// Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked
// when the consuming side has enough buffer space to receive more messages.
Expand Down
Expand Up @@ -135,8 +135,7 @@ public StreamObserver<ServerReflectionRequest> serverReflectionInfo(
ProtoReflectionStreamObserver requestObserver =
new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver);
serverCallStreamObserver.setOnReadyHandler(requestObserver);
serverCallStreamObserver.disableAutoInboundFlowControl();
serverCallStreamObserver.request(1);
serverCallStreamObserver.disableAutoRequestWithInitial(1);
return requestObserver;
}

Expand Down
Expand Up @@ -532,8 +532,11 @@ public void flowControl() throws Exception {
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);

// ClientCalls.startCall() calls request(1) initially, so we should get an immediate response.
// Verify we don't receive a response until we request it.
requestObserver.onNext(flowControlRequest);
assertEquals(0, clientResponseObserver.getResponses().size());

requestObserver.request(1);
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));

Expand All @@ -557,17 +560,15 @@ public void flowControlOnCompleteWithPendingRequest() throws Exception {
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);

// ClientCalls.startCall() calls request(1) initially, so make additional request.
requestObserver.onNext(flowControlRequest);
requestObserver.onNext(flowControlRequest);
requestObserver.onCompleted();
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(0, clientResponseObserver.getResponses().size());
assertFalse(clientResponseObserver.onCompleteCalled());

requestObserver.request(1);
assertTrue(clientResponseObserver.onCompleteCalled());
assertEquals(2, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
}

private final ServerReflectionRequest flowControlRequest =
Expand All @@ -594,7 +595,7 @@ private static class FlowControlClientResponseObserver

@Override
public void beforeStart(final ClientCallStreamObserver<ServerReflectionRequest> requestStream) {
requestStream.disableAutoInboundFlowControl();
requestStream.disableAutoRequestWithInitial(0);
}

@Override
Expand Down
28 changes: 28 additions & 0 deletions stub/src/main/java/io/grpc/stub/CallStreamObserver.java
Expand Up @@ -106,9 +106,37 @@ public abstract class CallStreamObserver<V> implements StreamObserver<V> {
* </li>
* </ul>
* </p>
*
* <p>To migrate to {@link #disableAutoRequestWithInitial} on the server side, call
* {@code disableAutoRequestWithInitial(0)} as {@code disableAutoInboundFlowControl}
* already disables all inbound requests. On the client side, {@code
* disableAutoRequestWithInitial(1)} should be called to maintain existing behavior as
* {@code disableAutoInboundFlowControl} does not disable the initial request.
*/
public abstract void disableAutoInboundFlowControl();

/**
* Disables automatic flow control where an additional message is requested to be read after a
* call to the 'inbound' {@link io.grpc.stub.StreamObserver#onNext(Object)} has completed. A
* number of initial requests to make when the call is started may be specified.
*
* <p>On client-side this method may only be called during {@link
* ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
* call to the application, before the service returns its {@code StreamObserver}.
*
* <p>Note that for server-side cases where the message is received before the handler is invoked,
* this method will have no effect. This is true for:
*
* <ul>
* <li>{@link io.grpc.MethodDescriptor.MethodType#UNARY} operations.</li>
* <li>{@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.</li>
* </ul>
* </p>
*
* <p>This API is still a work in-progress and will likely change in the future.
*/
public abstract void disableAutoRequestWithInitial(int request);

/**
* Requests the peer to produce {@code count} more messages to be delivered to the 'inbound'
* {@link StreamObserver}.
Expand Down
98 changes: 61 additions & 37 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Expand Up @@ -162,7 +162,7 @@ public static <ReqT, RespT> RespT blockingUnaryCall(
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
ClientCall<ReqT, RespT> call, ReqT req) {
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
asyncUnaryRequestCall(call, req, result.listener(), true);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}

Expand All @@ -179,7 +179,7 @@ public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor);
asyncUnaryRequestCall(call, req, result.listener(), true);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}

Expand All @@ -193,7 +193,7 @@ public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
ClientCall<ReqT, RespT> call, ReqT req) {
GrpcFuture<RespT> responseFuture = new GrpcFuture<>(call);
asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture), false);
asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture));
return responseFuture;
}

Expand Down Expand Up @@ -274,17 +274,14 @@ private static <ReqT, RespT> void asyncUnaryRequestCall(
req,
new StreamObserverToCallListenerAdapter<>(
responseObserver,
new CallToStreamObserverAdapter<>(call),
streamingResponse),
streamingResponse);
new CallToStreamObserverAdapter<>(call, streamingResponse)));
}

private static <ReqT, RespT> void asyncUnaryRequestCall(
ClientCall<ReqT, RespT> call,
ReqT req,
ClientCall.Listener<RespT> responseListener,
boolean streamingResponse) {
startCall(call, responseListener, streamingResponse);
StartableListener<RespT> responseListener) {
startCall(call, responseListener);
try {
call.sendMessage(req);
call.halfClose();
Expand All @@ -299,40 +296,39 @@ private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
ClientCall<ReqT, RespT> call,
StreamObserver<RespT> responseObserver,
boolean streamingResponse) {
CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<>(call);
CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<>(
call, streamingResponse);
startCall(
call,
new StreamObserverToCallListenerAdapter<>(
responseObserver, adapter, streamingResponse),
streamingResponse);
new StreamObserverToCallListenerAdapter<>(responseObserver, adapter));
return adapter;
}

private static <ReqT, RespT> void startCall(
ClientCall<ReqT, RespT> call,
ClientCall.Listener<RespT> responseListener,
boolean streamingResponse) {
StartableListener<RespT> responseListener) {
call.start(responseListener, new Metadata());
if (streamingResponse) {
call.request(1);
} else {
// Initially ask for two responses from flow-control so that if a misbehaving server sends
// more than one responses, we can catch it and fail it in the listener.
call.request(2);
}
responseListener.onStart();
}

private abstract static class StartableListener<T> extends ClientCall.Listener<T> {
abstract void onStart();
}

private static final class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> {
private boolean frozen;
private final ClientCall<T, ?> call;
private final boolean streamingResponse;
private Runnable onReadyHandler;
private boolean autoFlowControlEnabled = true;
private int initialRequest = 1;
private boolean autoRequestEnabled = true;
private boolean aborted = false;
private boolean completed = false;

// Non private to avoid synthetic class
CallToStreamObserverAdapter(ClientCall<T, ?> call) {
CallToStreamObserverAdapter(ClientCall<T, ?> call, boolean streamingResponse) {
this.call = call;
this.streamingResponse = streamingResponse;
}

private void freeze() {
Expand Down Expand Up @@ -372,18 +368,32 @@ public void setOnReadyHandler(Runnable onReadyHandler) {
this.onReadyHandler = onReadyHandler;
}

@Deprecated
@Override
public void disableAutoInboundFlowControl() {
disableAutoRequestWithInitial(1);
}

@Override
public void disableAutoRequestWithInitial(int request) {
if (frozen) {
throw new IllegalStateException(
"Cannot disable auto flow control after call started. Use ClientResponseObserver");
}
autoFlowControlEnabled = false;
Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative");
initialRequest = request;
autoRequestEnabled = false;
}

@Override
DRayX marked this conversation as resolved.
Show resolved Hide resolved
public void request(int count) {
call.request(count);
if (!streamingResponse && count == 1) {
// Initially ask for two responses from flow-control so that if a misbehaving server
// sends more than one responses, we can catch it and fail it in the listener.
call.request(2);
} else {
call.request(count);
}
}

@Override
Expand All @@ -398,19 +408,16 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
}

private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
extends ClientCall.Listener<RespT> {
extends StartableListener<RespT> {
private final StreamObserver<RespT> observer;
private final CallToStreamObserverAdapter<ReqT> adapter;
private final boolean streamingResponse;
private boolean firstResponseReceived;

// Non private to avoid synthetic class
StreamObserverToCallListenerAdapter(
StreamObserver<RespT> observer,
CallToStreamObserverAdapter<ReqT> adapter,
boolean streamingResponse) {
CallToStreamObserverAdapter<ReqT> adapter) {
this.observer = observer;
this.streamingResponse = streamingResponse;
this.adapter = adapter;
if (observer instanceof ClientResponseObserver) {
@SuppressWarnings("unchecked")
Expand All @@ -427,15 +434,15 @@ public void onHeaders(Metadata headers) {

@Override
public void onMessage(RespT message) {
if (firstResponseReceived && !streamingResponse) {
if (firstResponseReceived && !adapter.streamingResponse) {
throw Status.INTERNAL
.withDescription("More than one responses received for unary or client-streaming call")
.asRuntimeException();
}
firstResponseReceived = true;
observer.onNext(message);

if (streamingResponse && adapter.autoFlowControlEnabled) {
if (adapter.streamingResponse && adapter.autoRequestEnabled) {
// Request delivery of the next inbound message.
adapter.request(1);
}
Expand All @@ -456,12 +463,19 @@ public void onReady() {
adapter.onReadyHandler.run();
}
}

@Override
void onStart() {
if (adapter.initialRequest > 0) {
adapter.request(adapter.initialRequest);
}
}
}

/**
* Completes a {@link GrpcFuture} using {@link StreamObserver} events.
*/
private static final class UnaryStreamToFuture<RespT> extends ClientCall.Listener<RespT> {
private static final class UnaryStreamToFuture<RespT> extends StartableListener<RespT> {
private final GrpcFuture<RespT> responseFuture;
private RespT value;

Expand Down Expand Up @@ -497,6 +511,11 @@ public void onClose(Status status, Metadata trailers) {
responseFuture.setException(status.asRuntimeException(trailers));
}
}

@Override
void onStart() {
responseFuture.call.request(2);
}
}

private static final class GrpcFuture<RespT> extends AbstractFuture<RespT> {
Expand Down Expand Up @@ -538,7 +557,7 @@ protected String pendingToString() {
private static final class BlockingResponseStream<T> implements Iterator<T> {
// Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close.
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(2);
private final ClientCall.Listener<T> listener = new QueuingListener();
private final StartableListener<T> listener = new QueuingListener();
private final ClientCall<?, T> call;
/** May be null. */
private final ThreadlessExecutor threadless;
Expand All @@ -556,7 +575,7 @@ private static final class BlockingResponseStream<T> implements Iterator<T> {
this.threadless = threadless;
}

ClientCall.Listener<T> listener() {
StartableListener<T> listener() {
return listener;
}

Expand Down Expand Up @@ -628,7 +647,7 @@ public void remove() {
throw new UnsupportedOperationException();
}

private final class QueuingListener extends ClientCall.Listener<T> {
private final class QueuingListener extends StartableListener<T> {
// Non private to avoid synthetic class
QueuingListener() {}

Expand All @@ -654,6 +673,11 @@ public void onClose(Status status, Metadata trailers) {
}
done = true;
}

@Override
void onStart() {
call.request(1);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions stub/src/main/java/io/grpc/stub/ClientResponseObserver.java
Expand Up @@ -30,8 +30,8 @@ public interface ClientResponseObserver<ReqT, RespT> extends StreamObserver<Resp
* onReady events, disable auto inbound flow and perform other advanced functions.
*
* <p>Only the methods {@link ClientCallStreamObserver#setOnReadyHandler(Runnable)} and
* {@link ClientCallStreamObserver#disableAutoInboundFlowControl()} may be called within this
* callback
* {@link ClientCallStreamObserver#disableAutoRequestWithInitial(int)} may be called within
* this callback
*
* <pre>
* // Copy an iterator to the request stream under flow-control
Expand Down