Skip to content

Commit

Permalink
xds: only reschedule time for unresolved resources upon ADS stream re…
Browse files Browse the repository at this point in the history
…starts (#7582)

Since the xDS resource version info persists across ADS stream recreation so that the management server can choose to not send client resources that have already been sent previously (in the previous stream). The client should not consider previously received (resolved) resources not exist if it does not receive them on the new ADS stream. So initial resource fetch timers should only be scheduled for unresolved resources when the ADS stream is recreated.
  • Loading branch information
voidzcy committed Nov 2, 2020
1 parent bb6679e commit 7009c1a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
4 changes: 3 additions & 1 deletion xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Expand Up @@ -754,8 +754,10 @@ void removeWatcher(ResourceWatcher watcher) {
watchers.remove(watcher);
}

// FIXME(chengyuanzhang): should only restart timer if the resource is still unresolved.
void restartTimer() {
if (data != null || absent) { // resource already resolved
return;
}
class ResourceNotFound implements Runnable {
@Override
public void run() {
Expand Down
25 changes: 21 additions & 4 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java
Expand Up @@ -1463,7 +1463,7 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() {
}

@Test
public void streamClosedAndRetryRestartResourceInitialFetchTimers() {
public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedResources() {
xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher);
xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher);
xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher);
Expand All @@ -1477,15 +1477,32 @@ public void streamClosedAndRetryRestartResourceInitialFetchTimers() {
Iterables.getOnlyElement(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
ScheduledTask edsResourceTimeout =
Iterables.getOnlyElement(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
call.responseObserver.onError(Status.UNAVAILABLE.asException());

List<Any> listeners = ImmutableList.of(
Any.pack(buildListener(LDS_RESOURCE,
Any.pack(
HttpConnectionManager.newBuilder()
.setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2)))
.build()))));
DiscoveryResponse response =
buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000");
call.responseObserver.onNext(response);
assertThat(ldsResourceTimeout.isCancelled()).isTrue();

List<Any> routeConfigs =
ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2))));
response =
buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000");
call.responseObserver.onNext(response);
assertThat(rdsResourceTimeout.isCancelled()).isTrue();

call.responseObserver.onError(Status.UNAVAILABLE.asException());
assertThat(cdsResourceTimeout.isCancelled()).isTrue();
assertThat(edsResourceTimeout.isCancelled()).isTrue();

fakeClock.forwardNanos(10L);
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0);
assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0);
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
}
Expand Down

0 comments on commit 7009c1a

Please sign in to comment.