From 26cf60d8c7f84697bb043d2f136158188bdc1202 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Wed, 3 Jun 2020 01:51:39 +0000 Subject: [PATCH] xds: no longer use existing Subchannels after xDS resource becomes unavailable (#7081) Put Channel into TRANSIENT_FAILURE when CDS/EDS resource that is currently being watched becomes unavailable. CDS/EDS LB policies should shut down their downstream policy instances (stop using current Subchannels for new RPCs) and propagate TRANSIENT_FAILURE status to their parent policies (and may eventually to the Channel). --- .../java/io/grpc/xds/CdsLoadBalancer.java | 18 ++--- .../java/io/grpc/xds/EdsLoadBalancer.java | 23 +++--- .../java/io/grpc/xds/CdsLoadBalancerTest.java | 72 ++++++++++++++++++- .../java/io/grpc/xds/EdsLoadBalancerTest.java | 72 +++++++++++++++++-- 4 files changed, 158 insertions(+), 27 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java index e3dc2bad364..7d7c6d6bba7 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java @@ -259,9 +259,6 @@ private final class ClusterWatcherImpl implements ClusterWatcher { final EdsLoadBalancingHelper helper; final ResolvedAddresses resolvedAddresses; - // EDS balancer for the cluster. - // Becomes non-null once handleResolvedAddresses() successfully. - // Assigned at most once. @Nullable LoadBalancer edsBalancer; @@ -325,15 +322,14 @@ private void updateSslContextProvider(UpstreamTlsContext newUpstreamTlsContext) @Override public void onResourceDoesNotExist(String resourceName) { logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName); - // TODO(chengyuanzhang): should unconditionally propagate to downstream instances and - // go to TRANSIENT_FAILURE. - if (edsBalancer == null) { - helper.updateBalancingState( - TRANSIENT_FAILURE, - new ErrorPicker( - Status.UNAVAILABLE.withDescription( - "Resource " + resourceName + " is unavailable"))); + if (edsBalancer != null) { + edsBalancer.shutdown(); + edsBalancer = null; } + helper.updateBalancingState( + TRANSIENT_FAILURE, + new ErrorPicker( + Status.UNAVAILABLE.withDescription("Resource " + resourceName + " is unavailable"))); } @Override diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java index 818e39f460f..34e90c7c7a1 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java @@ -364,16 +364,21 @@ public void onEndpointChanged(EndpointUpdate endpointUpdate) { @Override public void onResourceDoesNotExist(String resourceName) { logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName); - // TODO(chengyuanzhang): should unconditionally propagate to downstream instances and - // go to TRANSIENT_FAILURE. This can only be achieved after LocalityStore is refactored - // to LoadBalancer. - if (!endpointsReceived) { - helper.updateBalancingState( - TRANSIENT_FAILURE, - new ErrorPicker( - Status.UNAVAILABLE.withDescription( - "Resource " + resourceName + " is unavailable"))); + if (isReportingLoad) { + logger.log( + XdsLogLevel.INFO, + "Stop reporting loads for cluster: {0}, cluster_service: {1}", + clusterName, + clusterServiceName); + xdsClient.cancelClientStatsReport(clusterName, clusterServiceName); + isReportingLoad = false; } + localityStore.reset(); + helper.updateBalancingState( + TRANSIENT_FAILURE, + new ErrorPicker( + Status.UNAVAILABLE.withDescription( + "Resource " + resourceName + " is unavailable"))); } @Override diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java index 841ef97023e..c22cf82aeb3 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.EDS_POLICY_NAME; import static io.grpc.xds.internal.sds.CommonTlsContextTestsUtil.BAD_CLIENT_KEY_FILE; @@ -44,12 +45,15 @@ import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.CreateSubchannelArgs; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; import io.grpc.internal.ServiceConfigUtil.PolicySelection; @@ -221,7 +225,7 @@ public void handleResolutionErrorBeforeOrAfterCdsWorking() { } @Test - public void handleCdsConfigs() { + public void handleCdsConfigUpdate() { assertThat(xdsClient).isNull(); ResolvedAddresses resolvedAddresses1 = ResolvedAddresses.newBuilder() .setAddresses(ImmutableList.of()) @@ -335,7 +339,7 @@ public void handleCdsConfigs() { } @Test - public void handleCdsConfigs_withUpstreamTlsContext() { + public void handleCdsConfigUpdate_withUpstreamTlsContext() { assertThat(xdsClient).isNull(); ResolvedAddresses resolvedAddresses1 = ResolvedAddresses.newBuilder() @@ -478,6 +482,70 @@ private void verifyUpstreamTlsContextAttribute( .isSameInstanceAs(xdsClientPool); } + @Test + public void clusterWatcher_resourceNotExist() { + ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool) + .build()) + .setLoadBalancingPolicyConfig(new CdsConfig("foo.googleapis.com")) + .build(); + cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses); + + ArgumentCaptor clusterWatcherCaptor = ArgumentCaptor.forClass(null); + verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor.capture()); + + ClusterWatcher clusterWatcher = clusterWatcherCaptor.getValue(); + ArgumentCaptor pickerCaptor = ArgumentCaptor.forClass(null); + clusterWatcher.onResourceDoesNotExist("foo.googleapis.com"); + assertThat(edsLoadBalancers).isEmpty(); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("Resource foo.googleapis.com is unavailable"); + } + + @Test + public void clusterWatcher_resourceRemoved() { + ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool) + .build()) + .setLoadBalancingPolicyConfig(new CdsConfig("foo.googleapis.com")) + .build(); + cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses); + + ArgumentCaptor clusterWatcherCaptor = ArgumentCaptor.forClass(null); + verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor.capture()); + + ClusterWatcher clusterWatcher = clusterWatcherCaptor.getValue(); + ArgumentCaptor pickerCaptor = ArgumentCaptor.forClass(null); + clusterWatcher.onClusterChanged( + ClusterUpdate.newBuilder() + .setClusterName("foo.googleapis.com") + .setEdsServiceName("edsServiceFoo.googleapis.com") + .setLbPolicy("round_robin") + .build()); + assertThat(edsLoadBalancers).hasSize(1); + assertThat(edsLbHelpers).hasSize(1); + LoadBalancer edsLoadBalancer = edsLoadBalancers.poll(); + Helper edsHelper = edsLbHelpers.poll(); + SubchannelPicker subchannelPicker = mock(SubchannelPicker.class); + edsHelper.updateBalancingState(READY, subchannelPicker); + verify(helper).updateBalancingState(eq(READY), same(subchannelPicker)); + + clusterWatcher.onResourceDoesNotExist("foo.googleapis.com"); + verify(edsLoadBalancer).shutdown(); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("Resource foo.googleapis.com is unavailable"); + } + @Test public void clusterWatcher_onErrorCalledBeforeAndAfterOnClusterChanged() { ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() diff --git a/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java index 845858ae943..38c9fe50032 100644 --- a/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java @@ -62,6 +62,7 @@ import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; @@ -82,6 +83,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -436,7 +438,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } @Test - public void handleAllDropUpdates_pickersAreDropped() { + public void edsResourceUpdate_allDrop() { deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy); ClusterLoadAssignment clusterLoadAssignment = buildClusterLoadAssignment( @@ -485,7 +487,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } @Test - public void handleLocalityAssignmentUpdates_pickersUpdatedFromChildBalancer() { + public void edsResourceUpdate_localityAssignmentChange() { deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy); LbEndpoint endpoint11 = buildLbEndpoint("addr11.example.com", 8011, HEALTHY, 11); @@ -549,7 +551,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // Uses a fake LocalityStoreFactory that creates a mock LocalityStore, and verifies interaction // between the EDS balancer and LocalityStore. @Test - public void handleEndpointUpdates_delegateUpdatesToLocalityStore() { + public void edsResourceUpdate_endpointAssignmentChange() { final ArrayDeque localityStores = new ArrayDeque<>(); localityStoreFactory = new LocalityStoreFactory() { @Override @@ -632,7 +634,67 @@ LocalityStore newLocalityStore( } @Test - public void verifyErrorPropagation_noPreviousEndpointUpdateReceived() { + public void edsResourceNotExist() { + deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy); + + // Forwarding 20 seconds so that the xds client will deem EDS resource not available. + fakeClock.forwardTime(20, TimeUnit.SECONDS); + assertThat(childBalancers).isEmpty(); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("Resource " + CLUSTER_NAME + " is unavailable"); + } + + @Test + public void edsResourceRemoved() { + deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy); + ClusterLoadAssignment clusterLoadAssignment = + buildClusterLoadAssignment(CLUSTER_NAME, + ImmutableList.of( + buildLocalityLbEndpoints("region", "zone", "subzone", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)), + 1, 0)), + ImmutableList.of()); + deliverClusterLoadAssignments(clusterLoadAssignment); + + assertThat(childBalancers).hasSize(1); + assertThat(childHelpers).hasSize(1); + LoadBalancer localityBalancer = childBalancers.get("subzone"); + Helper localityBalancerHelper = childHelpers.get("subzone"); + final Subchannel subchannel = mock(Subchannel.class); + SubchannelPicker picker = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withSubchannel(subchannel); + } + }; + localityBalancerHelper.updateBalancingState(READY, picker); + verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); + + // The whole cluster is no longer accessible. + // Note that EDS resource removal is achieved by CDS resource update. + responseObserver.onNext( + buildDiscoveryResponse( + String.valueOf(versionIno++), + Collections.emptyList(), + XdsClientImpl.ADS_TYPE_URL_CDS, + String.valueOf(nonce++))); + + verify(localityBalancer).shutdown(); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("Resource " + CLUSTER_NAME + " is unavailable"); + } + + @Test + public void transientError_noPreviousEndpointUpdateReceived() { deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy); // Forwarding 20 seconds so that the xds client will deem EDS resource not available. @@ -641,7 +703,7 @@ public void verifyErrorPropagation_noPreviousEndpointUpdateReceived() { } @Test - public void verifyErrorPropagation_withPreviousEndpointUpdateReceived() { + public void transientError_withPreviousEndpointUpdateReceived() { deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy); // Endpoint update received. ClusterLoadAssignment clusterLoadAssignment =