From 7009c1a863944a07ab6f5a2ccb7741d48f274777 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Mon, 2 Nov 2020 12:12:31 -0800 Subject: [PATCH] xds: only reschedule time for unresolved resources upon ADS stream restarts (#7582) Since the xDS resource version info persists across ADS stream recreation so that the management server can choose to not send client resources that have already been sent previously (in the previous stream). The client should not consider previously received (resolved) resources not exist if it does not receive them on the new ADS stream. So initial resource fetch timers should only be scheduled for unresolved resources when the ADS stream is recreated. --- .../java/io/grpc/xds/ClientXdsClient.java | 4 ++- .../java/io/grpc/xds/ClientXdsClientTest.java | 25 ++++++++++++++++--- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 5c1d8fcb94a..8ddbff0d550 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -754,8 +754,10 @@ void removeWatcher(ResourceWatcher watcher) { watchers.remove(watcher); } - // FIXME(chengyuanzhang): should only restart timer if the resource is still unresolved. void restartTimer() { + if (data != null || absent) { // resource already resolved + return; + } class ResourceNotFound implements Runnable { @Override public void run() { diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java index e1862e53617..d8430245c5c 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java @@ -1463,7 +1463,7 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() { } @Test - public void streamClosedAndRetryRestartResourceInitialFetchTimers() { + public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedResources() { xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); @@ -1477,15 +1477,32 @@ public void streamClosedAndRetryRestartResourceInitialFetchTimers() { Iterables.getOnlyElement(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); ScheduledTask edsResourceTimeout = Iterables.getOnlyElement(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); - call.responseObserver.onError(Status.UNAVAILABLE.asException()); + + List listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); assertThat(ldsResourceTimeout.isCancelled()).isTrue(); + + List routeConfigs = + ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); + response = + buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); assertThat(rdsResourceTimeout.isCancelled()).isTrue(); + + call.responseObserver.onError(Status.UNAVAILABLE.asException()); assertThat(cdsResourceTimeout.isCancelled()).isTrue(); assertThat(edsResourceTimeout.isCancelled()).isTrue(); fakeClock.forwardNanos(10L); - assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); - assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0); + assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); }