Skip to content

Commit

Permalink
stub: Add disableAutoRequestWithInitial that disables all automatic i…
Browse files Browse the repository at this point in the history
…nbound flow-control requests

Add a new disableAutoRequest method that disables all automatic requests while disableAutoInboundFlowControl maintains existing behavior.

The default behavior of requesting initial messages is applied even if disableAutoInboundFlowControl is called. ServerCalls disables all automatic flow control which is much more useful in case the user can't handle incoming messages until some time after the call has started.  This change creates a new StartableListener that has an onStart method that is invoked when the call is started which makes initial requests if necessary.

See #6806
  • Loading branch information
DRayX committed May 6, 2020
1 parent 0057c4f commit a9250c1
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 83 deletions.
Expand Up @@ -54,7 +54,7 @@ public void beforeStart(final ClientCallStreamObserver<HelloRequest> requestStre
this.requestStream = requestStream;
// Set up manual flow control for the response stream. It feels backwards to configure the response
// stream's flow control using the request stream's observer, but this is the way it is.
requestStream.disableAutoInboundFlowControl();
requestStream.disableAutoRequestWithInitial(1);

// Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked
// when the consuming side has enough buffer space to receive more messages.
Expand Down
Expand Up @@ -39,7 +39,7 @@ public StreamObserver<HelloRequest> sayHelloStreaming(final StreamObserver<Hello
// stream's flow control using the response stream's observer, but this is the way it is.
final ServerCallStreamObserver<HelloReply> serverCallStreamObserver =
(ServerCallStreamObserver<HelloReply>) responseObserver;
serverCallStreamObserver.disableAutoInboundFlowControl();
serverCallStreamObserver.disableAutoRequestWithInitial(0);

// Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked
// when the consuming side has enough buffer space to receive more messages.
Expand Down
Expand Up @@ -133,8 +133,7 @@ public StreamObserver<ServerReflectionRequest> serverReflectionInfo(
ProtoReflectionStreamObserver requestObserver =
new ProtoReflectionStreamObserver(getRefreshedIndex(), serverCallStreamObserver);
serverCallStreamObserver.setOnReadyHandler(requestObserver);
serverCallStreamObserver.disableAutoInboundFlowControl();
serverCallStreamObserver.request(1);
serverCallStreamObserver.disableAutoRequestWithInitial(1);
return requestObserver;
}

Expand Down
Expand Up @@ -564,8 +564,11 @@ public void flowControl() throws Exception {
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);

// ClientCalls.startCall() calls request(1) initially, so we should get an immediate response.
// Verify we don't receive a response until we request it.
requestObserver.onNext(flowControlRequest);
assertEquals(0, clientResponseObserver.getResponses().size());

requestObserver.request(1);
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));

Expand All @@ -589,17 +592,15 @@ public void flowControlOnCompleteWithPendingRequest() throws Exception {
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);

// ClientCalls.startCall() calls request(1) initially, so make additional request.
requestObserver.onNext(flowControlRequest);
requestObserver.onNext(flowControlRequest);
requestObserver.onCompleted();
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(0, clientResponseObserver.getResponses().size());
assertFalse(clientResponseObserver.onCompleteCalled());

requestObserver.request(1);
assertTrue(clientResponseObserver.onCompleteCalled());
assertEquals(2, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
}

private final ServerReflectionRequest flowControlRequest =
Expand All @@ -626,7 +627,7 @@ private static class FlowControlClientResponseObserver

@Override
public void beforeStart(final ClientCallStreamObserver<ServerReflectionRequest> requestStream) {
requestStream.disableAutoInboundFlowControl();
requestStream.disableAutoRequestWithInitial(0);
}

@Override
Expand Down
28 changes: 28 additions & 0 deletions stub/src/main/java/io/grpc/stub/CallStreamObserver.java
Expand Up @@ -106,9 +106,37 @@ public abstract class CallStreamObserver<V> implements StreamObserver<V> {
* </li>
* </ul>
* </p>
*
* <p>To migrate to {@link #disableAutoRequestWithInitial} on the server side, call
* {@code disableAutoRequestWithInitial(0)} as {@code disableAutoInboundFlowControl}
* already disables all inbound requests. On the client side, {@code
* disableAutoRequestWithInitial(1)} should be called to maintain existing behavior as
* {@code disableAutoInboundFlowControl} does not disable the initial request.
*/
public abstract void disableAutoInboundFlowControl();

/**
* Disables automatic flow control where an additional message is requested to be read after a
* call to the 'inbound' {@link io.grpc.stub.StreamObserver#onNext(Object)} has completed. A
* number of initial requests to make when the call is started may be specified.
*
* <p>On client-side this method may only be called during {@link
* ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
* call to the application, before the service returns its {@code StreamObserver}.
*
* <p>Note that for server-side cases where the message is received before the handler is invoked,
* this method will have no effect. This is true for:
*
* <ul>
* <li>{@link io.grpc.MethodDescriptor.MethodType#UNARY} operations.</li>
* <li>{@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.</li>
* </ul>
* </p>
*
* <p>This API is still a work in-progress and will likely change in the future.
*/
public abstract void disableAutoRequestWithInitial(int request);

/**
* Requests the peer to produce {@code count} more messages to be delivered to the 'inbound'
* {@link StreamObserver}.
Expand Down
98 changes: 61 additions & 37 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Expand Up @@ -164,7 +164,7 @@ public static <ReqT, RespT> RespT blockingUnaryCall(
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
ClientCall<ReqT, RespT> call, ReqT req) {
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
asyncUnaryRequestCall(call, req, result.listener(), true);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}

Expand All @@ -183,7 +183,7 @@ public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
.withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor);
asyncUnaryRequestCall(call, req, result.listener(), true);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}

Expand All @@ -197,7 +197,7 @@ public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
ClientCall<ReqT, RespT> call, ReqT req) {
GrpcFuture<RespT> responseFuture = new GrpcFuture<>(call);
asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture), false);
asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture));
return responseFuture;
}

Expand Down Expand Up @@ -278,17 +278,14 @@ private static <ReqT, RespT> void asyncUnaryRequestCall(
req,
new StreamObserverToCallListenerAdapter<>(
responseObserver,
new CallToStreamObserverAdapter<>(call),
streamingResponse),
streamingResponse);
new CallToStreamObserverAdapter<>(call, streamingResponse)));
}

private static <ReqT, RespT> void asyncUnaryRequestCall(
ClientCall<ReqT, RespT> call,
ReqT req,
ClientCall.Listener<RespT> responseListener,
boolean streamingResponse) {
startCall(call, responseListener, streamingResponse);
StartableListener<RespT> responseListener) {
startCall(call, responseListener);
try {
call.sendMessage(req);
call.halfClose();
Expand All @@ -303,40 +300,39 @@ private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
ClientCall<ReqT, RespT> call,
StreamObserver<RespT> responseObserver,
boolean streamingResponse) {
CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<>(call);
CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<>(
call, streamingResponse);
startCall(
call,
new StreamObserverToCallListenerAdapter<>(
responseObserver, adapter, streamingResponse),
streamingResponse);
new StreamObserverToCallListenerAdapter<>(responseObserver, adapter));
return adapter;
}

private static <ReqT, RespT> void startCall(
ClientCall<ReqT, RespT> call,
ClientCall.Listener<RespT> responseListener,
boolean streamingResponse) {
StartableListener<RespT> responseListener) {
call.start(responseListener, new Metadata());
if (streamingResponse) {
call.request(1);
} else {
// Initially ask for two responses from flow-control so that if a misbehaving server sends
// more than one responses, we can catch it and fail it in the listener.
call.request(2);
}
responseListener.onStart();
}

private abstract static class StartableListener<T> extends ClientCall.Listener<T> {
abstract void onStart();
}

private static final class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> {
private boolean frozen;
private final ClientCall<T, ?> call;
private final boolean streamingResponse;
private Runnable onReadyHandler;
private boolean autoFlowControlEnabled = true;
private int initialRequest = 1;
private boolean autoRequestEnabled = true;
private boolean aborted = false;
private boolean completed = false;

// Non private to avoid synthetic class
CallToStreamObserverAdapter(ClientCall<T, ?> call) {
CallToStreamObserverAdapter(ClientCall<T, ?> call, boolean streamingResponse) {
this.call = call;
this.streamingResponse = streamingResponse;
}

private void freeze() {
Expand Down Expand Up @@ -376,18 +372,32 @@ public void setOnReadyHandler(Runnable onReadyHandler) {
this.onReadyHandler = onReadyHandler;
}

@Deprecated
@Override
public void disableAutoInboundFlowControl() {
disableAutoRequestWithInitial(1);
}

@Override
public void disableAutoRequestWithInitial(int request) {
if (frozen) {
throw new IllegalStateException(
"Cannot disable auto flow control after call started. Use ClientResponseObserver");
}
autoFlowControlEnabled = false;
Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative");
initialRequest = request;
autoRequestEnabled = false;
}

@Override
public void request(int count) {
call.request(count);
if (!streamingResponse && count == 1) {
// Initially ask for two responses from flow-control so that if a misbehaving server
// sends more than one responses, we can catch it and fail it in the listener.
call.request(2);
} else {
call.request(count);
}
}

@Override
Expand All @@ -402,19 +412,16 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
}

private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
extends ClientCall.Listener<RespT> {
extends StartableListener<RespT> {
private final StreamObserver<RespT> observer;
private final CallToStreamObserverAdapter<ReqT> adapter;
private final boolean streamingResponse;
private boolean firstResponseReceived;

// Non private to avoid synthetic class
StreamObserverToCallListenerAdapter(
StreamObserver<RespT> observer,
CallToStreamObserverAdapter<ReqT> adapter,
boolean streamingResponse) {
CallToStreamObserverAdapter<ReqT> adapter) {
this.observer = observer;
this.streamingResponse = streamingResponse;
this.adapter = adapter;
if (observer instanceof ClientResponseObserver) {
@SuppressWarnings("unchecked")
Expand All @@ -431,15 +438,15 @@ public void onHeaders(Metadata headers) {

@Override
public void onMessage(RespT message) {
if (firstResponseReceived && !streamingResponse) {
if (firstResponseReceived && !adapter.streamingResponse) {
throw Status.INTERNAL
.withDescription("More than one responses received for unary or client-streaming call")
.asRuntimeException();
}
firstResponseReceived = true;
observer.onNext(message);

if (streamingResponse && adapter.autoFlowControlEnabled) {
if (adapter.streamingResponse && adapter.autoRequestEnabled) {
// Request delivery of the next inbound message.
adapter.request(1);
}
Expand All @@ -460,12 +467,19 @@ public void onReady() {
adapter.onReadyHandler.run();
}
}

@Override
void onStart() {
if (adapter.initialRequest > 0) {
adapter.request(adapter.initialRequest);
}
}
}

/**
* Completes a {@link GrpcFuture} using {@link StreamObserver} events.
*/
private static final class UnaryStreamToFuture<RespT> extends ClientCall.Listener<RespT> {
private static final class UnaryStreamToFuture<RespT> extends StartableListener<RespT> {
private final GrpcFuture<RespT> responseFuture;
private RespT value;

Expand Down Expand Up @@ -501,6 +515,11 @@ public void onClose(Status status, Metadata trailers) {
responseFuture.setException(status.asRuntimeException(trailers));
}
}

@Override
void onStart() {
responseFuture.call.request(2);
}
}

private static final class GrpcFuture<RespT> extends AbstractFuture<RespT> {
Expand Down Expand Up @@ -542,7 +561,7 @@ protected String pendingToString() {
private static final class BlockingResponseStream<T> implements Iterator<T> {
// Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close.
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(2);
private final ClientCall.Listener<T> listener = new QueuingListener();
private final StartableListener<T> listener = new QueuingListener();
private final ClientCall<?, T> call;
/** May be null. */
private final ThreadlessExecutor threadless;
Expand All @@ -560,7 +579,7 @@ private static final class BlockingResponseStream<T> implements Iterator<T> {
this.threadless = threadless;
}

ClientCall.Listener<T> listener() {
StartableListener<T> listener() {
return listener;
}

Expand Down Expand Up @@ -632,7 +651,7 @@ public void remove() {
throw new UnsupportedOperationException();
}

private final class QueuingListener extends ClientCall.Listener<T> {
private final class QueuingListener extends StartableListener<T> {
// Non private to avoid synthetic class
QueuingListener() {}

Expand All @@ -658,6 +677,11 @@ public void onClose(Status status, Metadata trailers) {
}
done = true;
}

@Override
void onStart() {
call.request(1);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions stub/src/main/java/io/grpc/stub/ClientResponseObserver.java
Expand Up @@ -30,8 +30,8 @@ public interface ClientResponseObserver<ReqT, RespT> extends StreamObserver<Resp
* onReady events, disable auto inbound flow and perform other advanced functions.
*
* <p>Only the methods {@link ClientCallStreamObserver#setOnReadyHandler(Runnable)} and
* {@link ClientCallStreamObserver#disableAutoInboundFlowControl()} may be called within this
* callback
* {@link ClientCallStreamObserver#disableAutoRequestWithInitial(int)} may be called within
* this callback
*
* <pre>
* // Copy an iterator to the request stream under flow-control
Expand Down

0 comments on commit a9250c1

Please sign in to comment.