From 140a9ae7f1cd501c1b2e259edfa31b6f84059777 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Wed, 22 Jul 2020 08:16:51 +0000 Subject: [PATCH] xds: routing policy should immediately update a picker that selects base on updated config (#7233) The routing LB policy should immediately the Channel's picker that delegates picking to the updated routes. Otherwise, new RPCs will keep being sent through old routes even if they are removed. This change also includes the fix for syncing state change for child balancers in deactivated state. --- .../io/grpc/xds/XdsRoutingLoadBalancer.java | 18 +- .../grpc/xds/XdsRoutingLoadBalancerTest.java | 279 +++++++++++++----- 2 files changed, 216 insertions(+), 81 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java index 1cd5045052a2..9a18b2388623 100644 --- a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java @@ -86,14 +86,14 @@ public void handleResolvedAddresses(final ResolvedAddresses resolvedAddresses) { } else { childLbStates.get(actionName).reactivate(action.getProvider()); } + final LoadBalancer childLb = childLbStates.get(actionName).lb; syncContext.execute(new Runnable() { @Override public void run() { - childLbStates.get(actionName).lb - .handleResolvedAddresses( - resolvedAddresses.toBuilder() - .setLoadBalancingPolicyConfig(action.getConfig()) - .build()); + childLb.handleResolvedAddresses( + resolvedAddresses.toBuilder() + .setLoadBalancingPolicyConfig(action.getConfig()) + .build()); } }); } @@ -102,6 +102,7 @@ public void run() { for (String actionName : diff) { childLbStates.get(actionName).deactivate(); } + updateOverallBalancingState(); } @Override @@ -235,12 +236,11 @@ private final class RouteHelper extends ForwardingLoadBalancerHelper { @Override public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { - if (deactivated) { - return; - } currentState = newState; currentPicker = newPicker; - updateOverallBalancingState(); + if (!deactivated) { + updateOverallBalancingState(); + } } @Override diff --git a/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java index 54646c37ea3a..f9bc47fc55e0 100644 --- a/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java @@ -17,12 +17,18 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import io.grpc.CallOptions; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; @@ -38,6 +44,7 @@ import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; import io.grpc.internal.PickSubchannelArgsImpl; @@ -51,14 +58,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -77,6 +88,8 @@ public void uncaughtException(Thread t, Throwable e) { @Mock private LoadBalancer.Helper helper; + @Captor + ArgumentCaptor pickerCaptor; private RouteMatch routeMatch1 = new RouteMatch( @@ -95,7 +108,8 @@ public void uncaughtException(Thread t, Throwable e) { new PathMatcher(null, "/", null), Collections.emptyList(), null); - private List childBalancers = new ArrayList<>(); + private final Map lbConfigInventory = new HashMap<>(); + private final List childBalancers = new ArrayList<>(); private LoadBalancer xdsRoutingLoadBalancer; @Before @@ -103,107 +117,228 @@ public void setUp() { MockitoAnnotations.initMocks(this); when(helper.getSynchronizationContext()).thenReturn(syncContext); when(helper.getScheduledExecutorService()).thenReturn(fakeClock.getScheduledExecutorService()); + lbConfigInventory.put("actionA", new Object()); + lbConfigInventory.put("actionB", new Object()); + lbConfigInventory.put("actionC", null); xdsRoutingLoadBalancer = new XdsRoutingLoadBalancer(helper); } - @Test - public void typicalWorkflow() { - Object childConfig1 = new Object(); - Object childConfig2 = new Object(); - PolicySelection policyA = - new PolicySelection(new FakeLoadBalancerProvider("policy_a"), null, childConfig1); - PolicySelection policyB = - new PolicySelection(new FakeLoadBalancerProvider("policy_b"), null, childConfig2); - PolicySelection policyC = - new PolicySelection(new FakeLoadBalancerProvider("policy_c"), null , null); - - XdsRoutingConfig config = - new XdsRoutingConfig( - Arrays.asList( - new Route(routeMatch1, "action_a"), - new Route(routeMatch2, "action_b"), - new Route(routeMatch3, "action_a")), - ImmutableMap.of("action_a", policyA, "action_b", policyB)); - xdsRoutingLoadBalancer - .handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(Collections.emptyList()) - .setLoadBalancingPolicyConfig(config) - .build()); + @After + public void tearDown() { + xdsRoutingLoadBalancer.shutdown(); + for (FakeLoadBalancer childLb : childBalancers) { + assertThat(childLb.shutdown).isTrue(); + } + } + @Test + public void handleResolvedAddressesUpdatesChannelPicker() { + deliverResolvedAddresses( + ImmutableMap.of( + new Route(routeMatch1, "actionA"), "policy_a", + new Route(routeMatch2, "actionB"), "policy_b")); + + verify(helper, atLeastOnce()).updateBalancingState( + eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + RouteMatchingSubchannelPicker picker = (RouteMatchingSubchannelPicker) pickerCaptor.getValue(); + assertThat(picker.routePickers).hasSize(2); + assertThat(picker.routePickers.get(routeMatch1).pickSubchannel(mock(PickSubchannelArgs.class))) + .isEqualTo(PickResult.withNoResult()); + assertThat(picker.routePickers.get(routeMatch2).pickSubchannel(mock(PickSubchannelArgs.class))) + .isEqualTo(PickResult.withNoResult()); assertThat(childBalancers).hasSize(2); FakeLoadBalancer childBalancer1 = childBalancers.get(0); FakeLoadBalancer childBalancer2 = childBalancers.get(1); assertThat(childBalancer1.name).isEqualTo("policy_a"); assertThat(childBalancer2.name).isEqualTo("policy_b"); - assertThat(childBalancer1.config).isEqualTo(childConfig1); - assertThat(childBalancer2.config).isEqualTo(childConfig2); - - // Receive an updated routing config. - config = - new XdsRoutingConfig( - Arrays.asList( - new Route(routeMatch1, "action_b"), - new Route(routeMatch2, "action_c"), - new Route(routeMatch3, "action_c")), - ImmutableMap.of("action_b", policyA, "action_c", policyC)); - xdsRoutingLoadBalancer - .handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(Collections.emptyList()) - .setLoadBalancingPolicyConfig(config) - .build()); - - assertThat(childBalancer2.shutdown) - .isTrue(); // (immediate) shutdown because "action_b" changes policy (before ready) + assertThat(childBalancer1.config).isEqualTo(lbConfigInventory.get("actionA")); + assertThat(childBalancer2.config).isEqualTo(lbConfigInventory.get("actionB")); + + // Receive an updated config. + deliverResolvedAddresses( + ImmutableMap.of( + new Route(routeMatch1, "actionA"), "policy_a", + new Route(routeMatch3, "actionC"), "policy_c")); + + verify(helper, atLeast(2)) + .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + picker = (RouteMatchingSubchannelPicker) pickerCaptor.getValue(); + assertThat(picker.routePickers).hasSize(2); + assertThat(picker.routePickers).doesNotContainKey(routeMatch2); + assertThat(picker.routePickers.get(routeMatch3).pickSubchannel(mock(PickSubchannelArgs.class))) + .isEqualTo(PickResult.withNoResult()); assertThat(fakeClock.numPendingTasks()) - .isEqualTo(1); // (delayed) shutdown because "action_a" is removed + .isEqualTo(1); // (delayed) shutdown because "actionB" is removed assertThat(childBalancer1.shutdown).isFalse(); + assertThat(childBalancer2.shutdown).isFalse(); + assertThat(childBalancers).hasSize(3); - FakeLoadBalancer childBalancer3 = childBalancers.get(1); - FakeLoadBalancer childBalancer4 = childBalancers.get(2); - assertThat(childBalancer3.name).isEqualTo("policy_a"); - assertThat(childBalancer3).isNotSameInstanceAs(childBalancer1); - assertThat(childBalancer4.name).isEqualTo("policy_c"); + FakeLoadBalancer childBalancer3 = childBalancers.get(2); + assertThat(childBalancer3.name).isEqualTo("policy_c"); + assertThat(childBalancer3.config).isEqualTo(lbConfigInventory.get("actionC")); - // Simulate subchannel state update from the leaf policy. + fakeClock.forwardTime( + XdsRoutingLoadBalancer.DELAYED_ACTION_DELETION_TIME_MINUTES, TimeUnit.MINUTES); + assertThat(childBalancer2.shutdown).isTrue(); + } + + @Test + public void updateWithActionPolicyChange() { + deliverResolvedAddresses(ImmutableMap.of(new Route(routeMatch1, "actionA"), "policy_a")); + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + assertThat(childBalancer.name).isEqualTo("policy_a"); + assertThat(childBalancer.config).isEqualTo(lbConfigInventory.get("actionA")); + + deliverResolvedAddresses(ImmutableMap.of(new Route(routeMatch1, "actionA"), "policy_b")); + assertThat(childBalancer.shutdown).isTrue(); // immediate shutdown as the it was not ready + assertThat(Iterables.getOnlyElement(childBalancers).name).isEqualTo("policy_b"); + assertThat(Iterables.getOnlyElement(childBalancers).config) + .isEqualTo(lbConfigInventory.get("actionA")); + } + + @Test + public void updateBalancingStateFromChildBalancers() { + deliverResolvedAddresses( + ImmutableMap.of( + new Route(routeMatch1, "actionA"), "policy_a", + new Route(routeMatch2, "actionB"), "policy_b")); + + assertThat(childBalancers).hasSize(2); + FakeLoadBalancer childBalancer1 = childBalancers.get(0); + FakeLoadBalancer childBalancer2 = childBalancers.get(1); Subchannel subchannel1 = mock(Subchannel.class); Subchannel subchannel2 = mock(Subchannel.class); - Subchannel subchannel3 = mock(Subchannel.class); childBalancer1.deliverSubchannelState(subchannel1, ConnectivityState.READY); - childBalancer3.deliverSubchannelState(subchannel2, ConnectivityState.CONNECTING); - childBalancer4.deliverSubchannelState(subchannel3, ConnectivityState.READY); - ArgumentCaptor pickerCaptor = ArgumentCaptor.forClass(null); verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); RouteMatchingSubchannelPicker picker = (RouteMatchingSubchannelPicker) pickerCaptor.getValue(); - assertThat(picker.routePickers).hasSize(3); + assertThat(picker.routePickers).hasSize(2); assertThat( picker.routePickers.get(routeMatch1) .pickSubchannel(mock(PickSubchannelArgs.class)).getSubchannel()) - .isSameInstanceAs(subchannel2); // routeMatch1 -> action_b -> policy_a -> subchannel2 + .isEqualTo(subchannel1); + assertThat(picker.routePickers.get(routeMatch2).pickSubchannel(mock(PickSubchannelArgs.class))) + .isEqualTo(PickResult.withNoResult()); + + childBalancer2.deliverSubchannelState(subchannel2, ConnectivityState.READY); + verify(helper, times(2)) + .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + picker = (RouteMatchingSubchannelPicker) pickerCaptor.getValue(); assertThat( picker.routePickers.get(routeMatch2) .pickSubchannel(mock(PickSubchannelArgs.class)).getSubchannel()) - .isSameInstanceAs(subchannel3); // routeMatch2 -> action_c -> policy_c -> subchannel3 + .isEqualTo(subchannel2); + } + + @Test + public void updateBalancingStateFromDeactivatedChildBalancer() { + FakeLoadBalancer balancer = + deliverAddressesAndUpdateToRemoveChildPolicy( + new Route(routeMatch1, "actionA"), "policy_a"); + Subchannel subchannel = mock(Subchannel.class); + balancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + verify(helper, never()).updateBalancingState( + eq(ConnectivityState.READY), any(SubchannelPicker.class)); + + deliverResolvedAddresses(ImmutableMap.of(new Route(routeMatch1, "actionA"), "policy_a")); + verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + RouteMatchingSubchannelPicker picker = (RouteMatchingSubchannelPicker) pickerCaptor.getValue(); assertThat( - picker.routePickers.get(routeMatch3) + picker.routePickers.get(routeMatch1) .pickSubchannel(mock(PickSubchannelArgs.class)).getSubchannel()) - .isSameInstanceAs(subchannel3); // routeMatch3 -> action_c -> policy_c -> subchannel3 + .isEqualTo(subchannel); + } - // Error propagation from upstream policies. - Status error = Status.UNAVAILABLE.withDescription("network error"); + @Test + public void errorPropagation() { + Status error = Status.UNAVAILABLE.withDescription("resolver error"); xdsRoutingLoadBalancer.handleNameResolutionError(error); - assertThat(childBalancer1.upstreamError).isNull(); - assertThat(childBalancer3.upstreamError).isEqualTo(error); - assertThat(childBalancer4.upstreamError).isEqualTo(error); - fakeClock.forwardTime( - XdsRoutingLoadBalancer.DELAYED_ACTION_DELETION_TIME_MINUTES, TimeUnit.MINUTES); - assertThat(childBalancer1.shutdown).isTrue(); + verify(helper).updateBalancingState( + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()).isEqualTo("resolver error"); - xdsRoutingLoadBalancer.shutdown(); - assertThat(childBalancer3.shutdown).isTrue(); - assertThat(childBalancer4.shutdown).isTrue(); + deliverResolvedAddresses( + ImmutableMap.of( + new Route(routeMatch1, "actionA"), "policy_a", + new Route(routeMatch2, "actionB"), "policy_b")); + + assertThat(childBalancers).hasSize(2); + FakeLoadBalancer childBalancer1 = childBalancers.get(0); + FakeLoadBalancer childBalancer2 = childBalancers.get(1); + + xdsRoutingLoadBalancer.handleNameResolutionError(error); + assertThat(childBalancer1.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(childBalancer1.upstreamError.getDescription()).isEqualTo("resolver error"); + assertThat(childBalancer2.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(childBalancer2.upstreamError.getDescription()).isEqualTo("resolver error"); + } + + @Test + public void errorPropagationToDeactivatedChildBalancer() { + FakeLoadBalancer balancer = + deliverAddressesAndUpdateToRemoveChildPolicy( + new Route(routeMatch1, "actionA"), "policy_a"); + xdsRoutingLoadBalancer.handleNameResolutionError( + Status.UNKNOWN.withDescription("unknown error")); + assertThat(balancer.upstreamError).isNull(); + } + + private FakeLoadBalancer deliverAddressesAndUpdateToRemoveChildPolicy( + Route route, String childPolicyName) { + lbConfigInventory.put("actionX", null); + Route routeX = + new Route( + new RouteMatch( + new PathMatcher( + "/XService/xMethod", null, null), + Collections.emptyList(), + null), + "actionX"); + deliverResolvedAddresses( + ImmutableMap.of(route, childPolicyName, routeX, "policy_x")); + + verify(helper, atLeastOnce()).updateBalancingState( + eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); + assertThat(childBalancers).hasSize(2); + FakeLoadBalancer balancer = childBalancers.get(0); + + deliverResolvedAddresses(ImmutableMap.of(routeX, "policy_x")); + verify(helper, atLeast(2)).updateBalancingState( + eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); + assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.MINUTES)) + .isEqualTo(XdsRoutingLoadBalancer.DELAYED_ACTION_DELETION_TIME_MINUTES); + return balancer; + } + + private void deliverResolvedAddresses(final Map childPolicies) { + syncContext.execute(new Runnable() { + @Override + public void run() { + xdsRoutingLoadBalancer + .handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setLoadBalancingPolicyConfig(buildConfig(childPolicies)) + .build()); + } + }); + } + + private XdsRoutingConfig buildConfig(Map childPolicies) { + Map childPolicySelections = new LinkedHashMap<>(); + List routeList = new ArrayList<>(); + for (Route route : childPolicies.keySet()) { + String childActionName = route.getActionName(); + String childPolicyName = childPolicies.get(route); + Object childConfig = lbConfigInventory.get(childActionName); + PolicySelection policy = + new PolicySelection(new FakeLoadBalancerProvider(childPolicyName), null, childConfig); + childPolicySelections.put(childActionName, policy); + routeList.add(route); + } + return new XdsRoutingConfig(routeList, childPolicySelections); } @Test