Skip to content

Commit

Permalink
api,stub: clarify StreamObserver and Listener param type (#8544)
Browse files Browse the repository at this point in the history
  • Loading branch information
dapengzhang0 committed Sep 21, 2021
1 parent 25022f6 commit 29d238a
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 13 deletions.
2 changes: 2 additions & 0 deletions api/src/main/java/io/grpc/ClientCall.java
Expand Up @@ -108,6 +108,8 @@ public abstract class ClientCall<ReqT, RespT> {
* an instance from multiple threads, but only one call simultaneously. A single thread may
* interleave calls to multiple instances, so implementations using ThreadLocals must be careful
* to avoid leaking inappropriate state (e.g., clearing the ThreadLocal before returning).
*
* @param <T> type of message received.
*/
public abstract static class Listener<T> {

Expand Down
2 changes: 2 additions & 0 deletions stub/src/main/java/io/grpc/stub/CallStreamObserver.java
Expand Up @@ -49,6 +49,8 @@
*
* <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
* "real" RPCs suitable for testing.
*
* @param <V> type of outbound message.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8499")
public abstract class CallStreamObserver<V> implements StreamObserver<V> {
Expand Down
6 changes: 2 additions & 4 deletions stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java
Expand Up @@ -29,7 +29,7 @@
* <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
* "real" RPCs suitable for testing and make a fake for the server-side.
*/
public abstract class ClientCallStreamObserver<V> extends CallStreamObserver<V> {
public abstract class ClientCallStreamObserver<ReqT> extends CallStreamObserver<ReqT> {
/**
* Prevent any further processing for this {@code ClientCallStreamObserver}. No further messages
* will be received. The server is informed of cancellations, but may not stop processing the
Expand Down Expand Up @@ -78,9 +78,7 @@ public void disableAutoRequestWithInitial(int request) {
* thread will always be used to execute the {@link Runnable}, it is guaranteed that executions
* are serialized with calls to the 'inbound' {@link StreamObserver}.
*
* <p>On client-side this method may only be called during {@link
* ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
* call to the application, before the service returns its {@code StreamObserver}.
* <p>May only be called during {@link ClientResponseObserver#beforeStart}.
*
* <p>Because there is a processing delay to deliver this notification, it is possible for
* concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
Expand Down
9 changes: 5 additions & 4 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Expand Up @@ -337,9 +337,10 @@ private abstract static class StartableListener<T> extends ClientCall.Listener<T
abstract void onStart();
}

private static final class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> {
private static final class CallToStreamObserverAdapter<ReqT>
extends ClientCallStreamObserver<ReqT> {
private boolean frozen;
private final ClientCall<T, ?> call;
private final ClientCall<ReqT, ?> call;
private final boolean streamingResponse;
private Runnable onReadyHandler;
private int initialRequest = 1;
Expand All @@ -348,7 +349,7 @@ private static final class CallToStreamObserverAdapter<T> extends ClientCallStre
private boolean completed = false;

// Non private to avoid synthetic class
CallToStreamObserverAdapter(ClientCall<T, ?> call, boolean streamingResponse) {
CallToStreamObserverAdapter(ClientCall<ReqT, ?> call, boolean streamingResponse) {
this.call = call;
this.streamingResponse = streamingResponse;
}
Expand All @@ -358,7 +359,7 @@ private void freeze() {
}

@Override
public void onNext(T value) {
public void onNext(ReqT value) {
checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
checkState(!completed, "Stream is already completed, no further calls are allowed");
call.sendMessage(value);
Expand Down
7 changes: 3 additions & 4 deletions stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
Expand Up @@ -27,7 +27,7 @@
* <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
* "real" RPCs suitable for testing and interact with the server using a normal client stub.
*/
public abstract class ServerCallStreamObserver<V> extends CallStreamObserver<V> {
public abstract class ServerCallStreamObserver<RespT> extends CallStreamObserver<RespT> {

/**
* Returns {@code true} when the call is cancelled and the server is encouraged to abort
Expand Down Expand Up @@ -113,9 +113,8 @@ public void disableAutoRequest() {
* thread will always be used to execute the {@link Runnable}, it is guaranteed that executions
* are serialized with calls to the 'inbound' {@link StreamObserver}.
*
* <p>On client-side this method may only be called during {@link
* ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
* call to the application, before the service returns its {@code StreamObserver}.
* <p>May only be called during the initial call to the application, before the service returns
* its {@code StreamObserver}.
*
* <p>Because there is a processing delay to deliver this notification, it is possible for
* concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
Expand Down
2 changes: 1 addition & 1 deletion stub/src/main/java/io/grpc/stub/ServerCalls.java
Expand Up @@ -447,7 +447,7 @@ public static void asyncUnimplementedUnaryCall(
* @param methodDescriptor of method for which error will be thrown.
* @param responseObserver on which error will be set.
*/
public static <T> StreamObserver<T> asyncUnimplementedStreamingCall(
public static <ReqT> StreamObserver<ReqT> asyncUnimplementedStreamingCall(
MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
// NB: For streaming call we want to do the same as for unary call. Fail-fast by setting error
// on responseObserver and then return no-op observer.
Expand Down

0 comments on commit 29d238a

Please sign in to comment.