Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify CallStreamObserver's Javadoc #6561

Merged
merged 3 commits into from Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,7 +24,6 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

public class ManualFlowControlServer {
Expand All @@ -42,31 +41,34 @@ public StreamObserver<HelloRequest> sayHelloStreaming(final StreamObserver<Hello
(ServerCallStreamObserver<HelloReply>) responseObserver;
serverCallStreamObserver.disableAutoInboundFlowControl();

// Guard against spurious onReady() calls caused by a race between onNext() and onReady(). If the transport
// toggles isReady() from false to true while onNext() is executing, but before onNext() checks isReady(),
// request(1) would be called twice - once by onNext() and once by the onReady() scheduled during onNext()'s
// execution.
final AtomicBoolean wasReady = new AtomicBoolean(false);

// Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked
// when the consuming side has enough buffer space to receive more messages.
//
// Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming StreamObserver's
// onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent additional messages
// from being processed by the incoming StreamObserver. The onReadyHandler must return in a timely manor or else
// message processing throughput will suffer.
serverCallStreamObserver.setOnReadyHandler(new Runnable() {
class OnReadyHandler implements Runnable {
// Guard against spurious onReady() calls caused by a race between onNext() and onReady(). If the transport
// toggles isReady() from false to true while onNext() is executing, but before onNext() checks isReady(),
// request(1) would be called twice - once by onNext() and once by the onReady() scheduled during onNext()'s
// execution.
private boolean wasReady = false;

@Override
public void run() {
if (serverCallStreamObserver.isReady() && wasReady.compareAndSet(false, true)) {
if (serverCallStreamObserver.isReady() && !wasReady) {
wasReady = true;
logger.info("READY");
// Signal the request sender to send one message. This happens when isReady() turns true, signaling that
// the receive buffer has enough free space to receive more messages. Calling request() serves to prime
// the message pump.
serverCallStreamObserver.request(1);
}
}
});
}
final OnReadyHandler onReadyHandler = new OnReadyHandler();
serverCallStreamObserver.setOnReadyHandler(onReadyHandler);

// Give gRPC a StreamObserver that can observe and process incoming requests.
return new StreamObserver<HelloRequest>() {
Expand All @@ -90,16 +92,17 @@ public void onNext(HelloRequest request) {
// Check the provided ServerCallStreamObserver to see if it is still ready to accept more messages.
if (serverCallStreamObserver.isReady()) {
// Signal the sender to send another request. As long as isReady() stays true, the server will keep
// cycling through the loop of onNext() -> request()...onNext() -> request()... until either the client
// runs out of messages and ends the loop or the server runs out of receive buffer space.
// cycling through the loop of onNext() -> request(1)...onNext() -> request(1)... until the client runs
// out of messages and ends the loop (via onCompleted()).
//
// If the server runs out of buffer space, isReady() will turn false. When the receive buffer has
// sufficiently drained, isReady() will turn true, and the serverCallStreamObserver's onReadyHandler
// will be called to restart the message pump.
// If request() was called here with the argument of more than 1, the server might runs out of receive
// buffer space, and isReady() will turn false. When the receive buffer has sufficiently drained,
// isReady() will turn true, and the serverCallStreamObserver's onReadyHandler will be called to restart
// the message pump.
serverCallStreamObserver.request(1);
} else {
// If not, note that back-pressure has begun.
wasReady.set(false);
onReadyHandler.wasReady = false;
}
} catch (Throwable throwable) {
throwable.printStackTrace();
Expand Down
26 changes: 18 additions & 8 deletions stub/src/main/java/io/grpc/stub/CallStreamObserver.java
Expand Up @@ -19,20 +19,30 @@
import io.grpc.ExperimentalApi;

/**
* A refinement of StreamObserver provided by the GRPC runtime to the application that allows for
* more complex interactions with call behavior.
* A refinement of StreamObserver provided by the GRPC runtime to the application (the client or
* the server) that allows for more complex interactions with call behavior.
*
* <p>In any call there are logically two {@link StreamObserver} implementations:
* <p>In any call there are logically four {@link StreamObserver} implementations:
* <ul>
* <li>'inbound' - which the GRPC runtime calls when it receives messages from the
* remote peer. This is implemented by the application.
* <li>'inbound', client-side - which the GRPC runtime calls when it receives messages from
* the server. This is implemented by the client application and passed into a service method
* on a stub object.
* </li>
* <li>'outbound' - which the GRPC runtime provides to the application which it uses to
* send messages to the remote peer.
* <li>'outbound', client-side - which the GRPC runtime provides to the client application and the
* client uses this {@code StreamObserver} to send messages to the server.
* </li>
* <li>'inbound', server-side - which the GRPC runtime calls when it receives messages from
* the client. This is implemented by the server application and returned from service
* implementations of client-side streaming and bidirectional streaming methods.
* </li>
* <li>'outbound', server-side - which the GRPC runtime provides to the server application and
* the server uses this {@code StreamObserver} to send messages (responses) to the client.
* </li>
* </ul>
*
* <p>Implementations of this class represent the 'outbound' message stream.
* <p>Implementations of this class represent the 'outbound' message streams. The client-side
* one is {@link ClientCallStreamObserver} and the service-side one is
* {@link ServerCallStreamObserver}.
*
* <p>Like {@code StreamObserver}, implementations are not required to be thread-safe; if multiple
* threads will be writing to an instance concurrently, the application must synchronize its calls.
Expand Down