Skip to content

Commit

Permalink
xds: no longer use existing Subchannels after xDS resource becomes un…
Browse files Browse the repository at this point in the history
…available (#7081)

Put Channel into TRANSIENT_FAILURE when CDS/EDS resource that is currently being watched becomes unavailable. CDS/EDS LB policies should shut down their downstream policy instances (stop using current Subchannels for new RPCs) and propagate TRANSIENT_FAILURE status to their parent policies (and may eventually to the Channel).
  • Loading branch information
voidzcy committed Jun 3, 2020
1 parent 1468579 commit 26cf60d
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 27 deletions.
18 changes: 7 additions & 11 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java
Expand Up @@ -259,9 +259,6 @@ private final class ClusterWatcherImpl implements ClusterWatcher {
final EdsLoadBalancingHelper helper;
final ResolvedAddresses resolvedAddresses;

// EDS balancer for the cluster.
// Becomes non-null once handleResolvedAddresses() successfully.
// Assigned at most once.
@Nullable
LoadBalancer edsBalancer;

Expand Down Expand Up @@ -325,15 +322,14 @@ private void updateSslContextProvider(UpstreamTlsContext newUpstreamTlsContext)
@Override
public void onResourceDoesNotExist(String resourceName) {
logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName);
// TODO(chengyuanzhang): should unconditionally propagate to downstream instances and
// go to TRANSIENT_FAILURE.
if (edsBalancer == null) {
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE.withDescription(
"Resource " + resourceName + " is unavailable")));
if (edsBalancer != null) {
edsBalancer.shutdown();
edsBalancer = null;
}
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE.withDescription("Resource " + resourceName + " is unavailable")));
}

@Override
Expand Down
23 changes: 14 additions & 9 deletions xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java
Expand Up @@ -364,16 +364,21 @@ public void onEndpointChanged(EndpointUpdate endpointUpdate) {
@Override
public void onResourceDoesNotExist(String resourceName) {
logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName);
// TODO(chengyuanzhang): should unconditionally propagate to downstream instances and
// go to TRANSIENT_FAILURE. This can only be achieved after LocalityStore is refactored
// to LoadBalancer.
if (!endpointsReceived) {
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE.withDescription(
"Resource " + resourceName + " is unavailable")));
if (isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Stop reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
isReportingLoad = false;
}
localityStore.reset();
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE.withDescription(
"Resource " + resourceName + " is unavailable")));
}

@Override
Expand Down
72 changes: 70 additions & 2 deletions xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.EDS_POLICY_NAME;
import static io.grpc.xds.internal.sds.CommonTlsContextTestsUtil.BAD_CLIENT_KEY_FILE;
Expand All @@ -44,12 +45,15 @@
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
Expand Down Expand Up @@ -221,7 +225,7 @@ public void handleResolutionErrorBeforeOrAfterCdsWorking() {
}

@Test
public void handleCdsConfigs() {
public void handleCdsConfigUpdate() {
assertThat(xdsClient).isNull();
ResolvedAddresses resolvedAddresses1 = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
Expand Down Expand Up @@ -335,7 +339,7 @@ public void handleCdsConfigs() {
}

@Test
public void handleCdsConfigs_withUpstreamTlsContext() {
public void handleCdsConfigUpdate_withUpstreamTlsContext() {
assertThat(xdsClient).isNull();
ResolvedAddresses resolvedAddresses1 =
ResolvedAddresses.newBuilder()
Expand Down Expand Up @@ -478,6 +482,70 @@ private void verifyUpstreamTlsContextAttribute(
.isSameInstanceAs(xdsClientPool);
}

@Test
public void clusterWatcher_resourceNotExist() {
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(Attributes.newBuilder()
.set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
.build())
.setLoadBalancingPolicyConfig(new CdsConfig("foo.googleapis.com"))
.build();
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);

ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor = ArgumentCaptor.forClass(null);
verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor.capture());

ClusterWatcher clusterWatcher = clusterWatcherCaptor.getValue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
clusterWatcher.onResourceDoesNotExist("foo.googleapis.com");
assertThat(edsLoadBalancers).isEmpty();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("Resource foo.googleapis.com is unavailable");
}

@Test
public void clusterWatcher_resourceRemoved() {
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(Attributes.newBuilder()
.set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
.build())
.setLoadBalancingPolicyConfig(new CdsConfig("foo.googleapis.com"))
.build();
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);

ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor = ArgumentCaptor.forClass(null);
verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor.capture());

ClusterWatcher clusterWatcher = clusterWatcherCaptor.getValue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
clusterWatcher.onClusterChanged(
ClusterUpdate.newBuilder()
.setClusterName("foo.googleapis.com")
.setEdsServiceName("edsServiceFoo.googleapis.com")
.setLbPolicy("round_robin")
.build());
assertThat(edsLoadBalancers).hasSize(1);
assertThat(edsLbHelpers).hasSize(1);
LoadBalancer edsLoadBalancer = edsLoadBalancers.poll();
Helper edsHelper = edsLbHelpers.poll();
SubchannelPicker subchannelPicker = mock(SubchannelPicker.class);
edsHelper.updateBalancingState(READY, subchannelPicker);
verify(helper).updateBalancingState(eq(READY), same(subchannelPicker));

clusterWatcher.onResourceDoesNotExist("foo.googleapis.com");
verify(edsLoadBalancer).shutdown();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("Resource foo.googleapis.com is unavailable");
}

@Test
public void clusterWatcher_onErrorCalledBeforeAndAfterOnClusterChanged() {
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
Expand Down
72 changes: 67 additions & 5 deletions xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java
Expand Up @@ -62,6 +62,7 @@
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
Expand All @@ -82,6 +83,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -436,7 +438,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
}

@Test
public void handleAllDropUpdates_pickersAreDropped() {
public void edsResourceUpdate_allDrop() {
deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy);

ClusterLoadAssignment clusterLoadAssignment = buildClusterLoadAssignment(
Expand Down Expand Up @@ -485,7 +487,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
}

@Test
public void handleLocalityAssignmentUpdates_pickersUpdatedFromChildBalancer() {
public void edsResourceUpdate_localityAssignmentChange() {
deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy);

LbEndpoint endpoint11 = buildLbEndpoint("addr11.example.com", 8011, HEALTHY, 11);
Expand Down Expand Up @@ -549,7 +551,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
// Uses a fake LocalityStoreFactory that creates a mock LocalityStore, and verifies interaction
// between the EDS balancer and LocalityStore.
@Test
public void handleEndpointUpdates_delegateUpdatesToLocalityStore() {
public void edsResourceUpdate_endpointAssignmentChange() {
final ArrayDeque<LocalityStore> localityStores = new ArrayDeque<>();
localityStoreFactory = new LocalityStoreFactory() {
@Override
Expand Down Expand Up @@ -632,7 +634,67 @@ LocalityStore newLocalityStore(
}

@Test
public void verifyErrorPropagation_noPreviousEndpointUpdateReceived() {
public void edsResourceNotExist() {
deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy);

// Forwarding 20 seconds so that the xds client will deem EDS resource not available.
fakeClock.forwardTime(20, TimeUnit.SECONDS);
assertThat(childBalancers).isEmpty();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("Resource " + CLUSTER_NAME + " is unavailable");
}

@Test
public void edsResourceRemoved() {
deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy);
ClusterLoadAssignment clusterLoadAssignment =
buildClusterLoadAssignment(CLUSTER_NAME,
ImmutableList.of(
buildLocalityLbEndpoints("region", "zone", "subzone",
ImmutableList.of(
buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.<DropOverload>of());
deliverClusterLoadAssignments(clusterLoadAssignment);

assertThat(childBalancers).hasSize(1);
assertThat(childHelpers).hasSize(1);
LoadBalancer localityBalancer = childBalancers.get("subzone");
Helper localityBalancerHelper = childHelpers.get("subzone");
final Subchannel subchannel = mock(Subchannel.class);
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel);
}
};
localityBalancerHelper.updateBalancingState(READY, picker);
verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getSubchannel()).isSameInstanceAs(subchannel);

// The whole cluster is no longer accessible.
// Note that EDS resource removal is achieved by CDS resource update.
responseObserver.onNext(
buildDiscoveryResponse(
String.valueOf(versionIno++),
Collections.<Any>emptyList(),
XdsClientImpl.ADS_TYPE_URL_CDS,
String.valueOf(nonce++)));

verify(localityBalancer).shutdown();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("Resource " + CLUSTER_NAME + " is unavailable");
}

@Test
public void transientError_noPreviousEndpointUpdateReceived() {
deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy);

// Forwarding 20 seconds so that the xds client will deem EDS resource not available.
Expand All @@ -641,7 +703,7 @@ public void verifyErrorPropagation_noPreviousEndpointUpdateReceived() {
}

@Test
public void verifyErrorPropagation_withPreviousEndpointUpdateReceived() {
public void transientError_withPreviousEndpointUpdateReceived() {
deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy);
// Endpoint update received.
ClusterLoadAssignment clusterLoadAssignment =
Expand Down

0 comments on commit 26cf60d

Please sign in to comment.