Skip to content

Commit

Permalink
fix: Fix watchdog to start with WAITING state (#2468)
Browse files Browse the repository at this point in the history
Watchdog should start with WAITING state, and only switch to `idle` if
auto flow control was disabled. Before the fix, when auto flow control
was enabled, we wait for server to return a response without calling
`onRequest()` and watchdog would report the timeout exception because of
idle timeout, which is incorrect and causes confusion.

Before submitting your PR, there are a few things you can do to make
sure it goes smoothly:

- [ ] Make sure to open an issue as a
[bug/issue](https://github.com/googleapis/gapic-generator-java/issues/new/choose)
before writing your code! That way we can discuss the change, evaluate
designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #2498 ☕️

---------

Co-authored-by: Igor Bernstein <igorbernstein@google.com>
Co-authored-by: Lawrence Qiu <lawrenceqiu@google.com>
  • Loading branch information
3 people committed Feb 23, 2024
1 parent fc18449 commit dedc40f
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
15 changes: 14 additions & 1 deletion gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java
Expand Up @@ -193,8 +193,11 @@ class WatchdogStream<ResponseT> extends StateCheckingResponseObserver<ResponseT>
private final ResponseObserver<ResponseT> outerResponseObserver;
private volatile StreamController innerController;

// When a stream is created it has automatic inbound flow control enabled. The stream
// won't wait for the caller to request a message. Setting the default to WAITING
// to reflect this state.
@GuardedBy("lock")
private State state = State.IDLE;
private State state = State.WAITING;

@GuardedBy("lock")
private int pendingCount = 0;
Expand All @@ -220,6 +223,16 @@ public void onStartImpl(StreamController controller) {
public void disableAutoInboundFlowControl() {
Preconditions.checkState(
!hasStarted, "Can't disable automatic flow control after the stream has started");

// Adding the lock only to satisfy the annotation. It doesn't matter because before
// the stream is started, this is only accessed by the caller.
synchronized (lock) {
// When auto flow control is disabled, caller needs to call onRequest() to request a
// message. Setting the state to IDLE because now we're waiting for caller to call
// onRequest().
state = State.IDLE;
}

autoAutoFlowControl = false;
innerController.disableAutoInboundFlowControl();
}
Expand Down
Expand Up @@ -154,6 +154,31 @@ public void testTimedOutBeforeStart() throws InterruptedException {
assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
public void testTimedOutBeforeResponse() throws InterruptedException {
MockServerStreamingCallable<String, String> autoFlowControlCallable =
new MockServerStreamingCallable<>();
AutoFlowControlObserver<String> downstreamObserver = new AutoFlowControlObserver<>();

autoFlowControlCallable.call("request", watchdog.watch(downstreamObserver, waitTime, idleTime));
MockServerStreamingCall<String, String> call1 = autoFlowControlCallable.popLastCall();

clock.incrementNanoTime(idleTime.toNanos() + 1);
watchdog.run();
assertThat(downstreamObserver.done.isDone()).isFalse();
assertThat(call1.getController().isCancelled()).isTrue();
call1.getController().getObserver().onError(new CancellationException("cancelled"));

Throwable actualError = null;
try {
downstreamObserver.done.get();
} catch (ExecutionException e) {
actualError = e.getCause();
}
assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
assertThat(actualError.getMessage()).contains("waiting for next response");
}

@Test
public void testMultiple() throws Exception {
// Start stream1
Expand Down Expand Up @@ -310,4 +335,30 @@ public void onComplete() {
done.set(null);
}
}

static class AutoFlowControlObserver<T> implements ResponseObserver<T> {
SettableApiFuture<StreamController> controller = SettableApiFuture.create();
Queue<T> responses = Queues.newLinkedBlockingDeque();
SettableApiFuture<Void> done = SettableApiFuture.create();

@Override
public void onStart(StreamController controller) {
this.controller.set(controller);
}

@Override
public void onResponse(T response) {
responses.add(response);
}

@Override
public void onError(Throwable t) {
done.setException(t);
}

@Override
public void onComplete() {
done.set(null);
}
}
}

0 comments on commit dedc40f

Please sign in to comment.