From 5ca8054a00c3f5614a98b4293c6a24dd97b69c7c Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 24 Jan 2020 01:28:10 -0800 Subject: [PATCH 1/7] Change DNS resolver to return balancer addresses as attributes of resolution result. Change AutoConfiguredLoadBalancerFactory to choose LB policy purely based on lb policy config in service config. GrpclbLoadBalancer populates balancer address from attributes instead of from addresses. --- .../AutoConfiguredLoadBalancerFactory.java | 118 +---- .../io/grpc/internal/DnsNameResolver.java | 19 +- .../java/io/grpc/internal/GrpcAttributes.java | 4 + ...AutoConfiguredLoadBalancerFactoryTest.java | 375 ++++--------- .../io/grpc/internal/DnsNameResolverTest.java | 63 ++- .../java/io/grpc/grpclb/GrpclbConstants.java | 9 + .../io/grpc/grpclb/GrpclbLoadBalancer.java | 26 +- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 497 ++++++++++-------- 8 files changed, 481 insertions(+), 630 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java index 263a20d4a8d..466999b37bc 100644 --- a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java @@ -44,15 +44,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.logging.Logger; import javax.annotation.Nullable; // TODO(creamsoup) fully deprecate LoadBalancer.ATTR_LOAD_BALANCING_CONFIG @SuppressWarnings("deprecation") public final class AutoConfiguredLoadBalancerFactory { - private static final Logger logger = - Logger.getLogger(AutoConfiguredLoadBalancerFactory.class.getName()); - private static final String GRPCLB_POLICY_NAME = "grpclb"; private final LoadBalancerRegistry registry; private final String defaultPolicy; @@ -92,7 +88,6 @@ public final class AutoConfiguredLoadBalancer { private final Helper helper; private LoadBalancer delegate; private LoadBalancerProvider delegateProvider; - private boolean roundRobinDueToGrpclbDepMissing; AutoConfiguredLoadBalancer(Helper helper) { this.helper = helper; @@ -125,48 +120,53 @@ Status tryHandleResolvedAddresses(ResolvedAddresses resolvedAddresses) { } PolicySelection policySelection = (PolicySelection) resolvedAddresses.getLoadBalancingPolicyConfig(); - ResolvedPolicySelection resolvedSelection; - try { - resolvedSelection = resolveLoadBalancerProvider(servers, policySelection); - } catch (PolicyException e) { - Status s = Status.INTERNAL.withDescription(e.getMessage()); - helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s)); - delegate.shutdown(); - delegateProvider = null; - delegate = new NoopLoadBalancer(); - return Status.OK; + if (policySelection == null) { + LoadBalancerProvider defaultProvider; + try { + defaultProvider = getProviderOrThrow(defaultPolicy, "using default policy"); + } catch (PolicyException e) { + Status s = Status.INTERNAL.withDescription(e.getMessage()); + helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s)); + delegate.shutdown(); + delegateProvider = null; + delegate = new NoopLoadBalancer(); + return Status.OK; + } + policySelection = + new PolicySelection(defaultProvider, /* rawConfig= */ null, /* config= */ null); } - PolicySelection selection = resolvedSelection.policySelection; if (delegateProvider == null - || !selection.provider.getPolicyName().equals(delegateProvider.getPolicyName())) { + || !policySelection.provider.getPolicyName().equals(delegateProvider.getPolicyName())) { helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker()); delegate.shutdown(); - delegateProvider = selection.provider; + delegateProvider = policySelection.provider; LoadBalancer old = delegate; delegate = delegateProvider.newLoadBalancer(helper); helper.getChannelLogger().log( ChannelLogLevel.INFO, "Load balancer changed from {0} to {1}", old.getClass().getSimpleName(), delegate.getClass().getSimpleName()); } - Object lbConfig = selection.config; + Object lbConfig = policySelection.config; if (lbConfig != null) { helper.getChannelLogger().log( - ChannelLogLevel.DEBUG, "Load-balancing config: {0}", selection.config); + ChannelLogLevel.DEBUG, "Load-balancing config: {0}", policySelection.config); attributes = - attributes.toBuilder().set(ATTR_LOAD_BALANCING_CONFIG, selection.rawConfig).build(); + attributes.toBuilder() + .set(ATTR_LOAD_BALANCING_CONFIG, policySelection.rawConfig) + .build(); } LoadBalancer delegate = getDelegate(); - if (resolvedSelection.serverList.isEmpty() + if (resolvedAddresses.getAddresses().isEmpty() && !delegate.canHandleEmptyAddressListFromNameResolution()) { return Status.UNAVAILABLE.withDescription( "NameResolver returned no usable address. addrs=" + servers + ", attrs=" + attributes); } else { delegate.handleResolvedAddresses( ResolvedAddresses.newBuilder() - .setAddresses(resolvedSelection.serverList) + .setAddresses(resolvedAddresses.getAddresses()) .setAttributes(attributes) .setLoadBalancingPolicyConfig(lbConfig) .build()); @@ -206,78 +206,6 @@ void setDelegate(LoadBalancer lb) { LoadBalancerProvider getDelegateProvider() { return delegateProvider; } - - /** - * Resolves a load balancer based on given criteria. If policySelection is {@code null} and - * given servers contains any gRPC LB addresses, it will fall back to "grpclb". If no gRPC LB - * addresses are not present, it will fall back to {@link #defaultPolicy}. - * - * @param servers The list of servers reported - * @param policySelection the selected policy from raw service config - * @return the resolved policy selection - */ - @VisibleForTesting - ResolvedPolicySelection resolveLoadBalancerProvider( - List servers, @Nullable PolicySelection policySelection) - throws PolicyException { - // Check for balancer addresses - boolean haveBalancerAddress = false; - List backendAddrs = new ArrayList<>(); - for (EquivalentAddressGroup s : servers) { - if (s.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) != null) { - haveBalancerAddress = true; - } else { - backendAddrs.add(s); - } - } - - if (policySelection != null) { - String policyName = policySelection.provider.getPolicyName(); - return new ResolvedPolicySelection( - policySelection, policyName.equals(GRPCLB_POLICY_NAME) ? servers : backendAddrs); - } - - if (haveBalancerAddress) { - // This is a special case where the existence of balancer address in the resolved address - // selects "grpclb" policy if the service config couldn't select a policy - LoadBalancerProvider grpclbProvider = registry.getProvider(GRPCLB_POLICY_NAME); - if (grpclbProvider == null) { - if (backendAddrs.isEmpty()) { - throw new PolicyException( - "Received ONLY balancer addresses but grpclb runtime is missing"); - } - if (!roundRobinDueToGrpclbDepMissing) { - // We don't log the warning every time we have an update. - roundRobinDueToGrpclbDepMissing = true; - String errorMsg = "Found balancer addresses but grpclb runtime is missing." - + " Will use round_robin. Please include grpc-grpclb in your runtime dependencies."; - helper.getChannelLogger().log(ChannelLogLevel.ERROR, errorMsg); - logger.warning(errorMsg); - } - return new ResolvedPolicySelection( - new PolicySelection( - getProviderOrThrow( - "round_robin", "received balancer addresses but grpclb runtime is missing"), - /* rawConfig = */ null, - /* config= */ null), - backendAddrs); - } - return new ResolvedPolicySelection( - new PolicySelection( - grpclbProvider, /* rawConfig= */ null, /* config= */ null), servers); - } - // No balancer address this time. If balancer address shows up later, we want to make sure - // the warning is logged one more time. - roundRobinDueToGrpclbDepMissing = false; - - // No config nor balancer address. Use default. - return new ResolvedPolicySelection( - new PolicySelection( - getProviderOrThrow(defaultPolicy, "using default policy"), - /* rawConfig= */ null, - /* config= */ null), - servers); - } } private LoadBalancerProvider getProviderOrThrow(String policy, String choiceReason) diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java index f81c577820f..150f962738d 100644 --- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java +++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java @@ -293,19 +293,24 @@ public void run() { Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e)); return; } + if (resolutionResults.addresses.isEmpty() && resolutionResults.balancerAddresses.isEmpty()) { + savedListener.onError(Status.UNAVAILABLE.withDescription( + "No DNS backend or balancer addresses found for " + host)); + return; + } // Each address forms an EAG List servers = new ArrayList<>(); for (InetAddress inetAddr : resolutionResults.addresses) { servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, port))); } - servers.addAll(resolutionResults.balancerAddresses); - if (servers.isEmpty()) { - savedListener.onError(Status.UNAVAILABLE.withDescription( - "No DNS backend or balancer addresses found for " + host)); - return; - } - ResolutionResult.Builder resultBuilder = ResolutionResult.newBuilder().setAddresses(servers); + ResolutionResult.Builder resultBuilder = + ResolutionResult.newBuilder() + .setAddresses(servers) + .setAttributes( + Attributes.newBuilder() + .set(GrpcAttributes.ATTR_LB_ADDRS, resolutionResults.balancerAddresses) + .build()); if (!resolutionResults.txtRecords.isEmpty()) { ConfigOrError rawServiceConfig = parseServiceConfig(resolutionResults.txtRecords, random, getLocalHostname()); diff --git a/core/src/main/java/io/grpc/internal/GrpcAttributes.java b/core/src/main/java/io/grpc/internal/GrpcAttributes.java index b8138d7f26f..5f4618d9ea8 100644 --- a/core/src/main/java/io/grpc/internal/GrpcAttributes.java +++ b/core/src/main/java/io/grpc/internal/GrpcAttributes.java @@ -21,6 +21,7 @@ import io.grpc.Grpc; import io.grpc.NameResolver; import io.grpc.SecurityLevel; +import java.util.List; import java.util.Map; /** @@ -37,6 +38,9 @@ public final class GrpcAttributes { public static final Attributes.Key> NAME_RESOLVER_SERVICE_CONFIG = Attributes.Key.create("service-config"); + public static final Attributes.Key> ATTR_LB_ADDRS = + Attributes.Key.create("io.grpc.grpclb.lbAddrs"); + /** * The naming authority of a gRPC LB server address. It is an address-group-level attribute, * present when the address group is a LoadBalancer. diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java index 9232772778a..e534c2bffba 100644 --- a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java +++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java @@ -19,13 +19,10 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; -import static org.mockito.ArgumentMatchers.startsWith; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -50,23 +47,18 @@ import io.grpc.LoadBalancer.SubchannelStateListener; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; -import io.grpc.ManagedChannel; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; -import io.grpc.SynchronizationContext; import io.grpc.grpclb.GrpclbLoadBalancerProvider; import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer; -import io.grpc.internal.AutoConfiguredLoadBalancerFactory.PolicyException; import io.grpc.internal.AutoConfiguredLoadBalancerFactory.PolicySelection; -import io.grpc.internal.AutoConfiguredLoadBalancerFactory.ResolvedPolicySelection; import io.grpc.util.ForwardingLoadBalancerHelper; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -308,37 +300,42 @@ public void handleResolvedAddressGroups_propagateLbConfigToDelegate() throws Exc } @Test - public void handleResolvedAddressGroups_propagateOnlyBackendAddrsToDelegate() throws Exception { - // This case only happens when grpclb is missing. We will use a local registry - LoadBalancerRegistry registry = new LoadBalancerRegistry(); - registry.register(new PickFirstLoadBalancerProvider()); - registry.register( - new FakeLoadBalancerProvider( - "round_robin", testLbBalancer, /* nextParsedLbPolicyConfig= */ null)); + public void handleResolvedAddressGroups_propagateAddrsToDelegate() throws Exception { + Map rawServiceConfig = + parseConfig("{\"loadBalancingConfig\": [ {\"test_lb\": { \"setting1\": \"high\" } } ] }"); + ConfigOrError lbConfigs = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); + assertThat(lbConfigs.getConfig()).isNotNull(); - final List servers = - Arrays.asList( - new EquivalentAddressGroup(new SocketAddress(){}), - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); Helper helper = new TestHelper(); - AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory( - registry, GrpcUtil.DEFAULT_LB_POLICY).newLoadBalancer(helper); + AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(helper); + List servers = + Collections.singletonList(new EquivalentAddressGroup(new InetSocketAddress(8080){})); Status handleResult = lb.tryHandleResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers) - .setAttributes(Attributes.EMPTY) + .setLoadBalancingPolicyConfig(lbConfigs.getConfig()) .build()); + verify(testLbBalancerProvider).newLoadBalancer(same(helper)); assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); assertThat(lb.getDelegate()).isSameInstanceAs(testLbBalancer); - verify(testLbBalancer).handleResolvedAddresses( + ArgumentCaptor resultCaptor = + ArgumentCaptor.forClass(ResolvedAddresses.class); + verify(testLbBalancer).handleResolvedAddresses(resultCaptor.capture()); + assertThat(resultCaptor.getValue().getAddresses()).containsExactlyElementsIn(servers).inOrder(); + + servers = + Collections.singletonList(new EquivalentAddressGroup(new InetSocketAddress(9090){})); + handleResult = lb.tryHandleResolvedAddresses( ResolvedAddresses.newBuilder() - .setAddresses(Collections.singletonList(servers.get(0))) - .setAttributes(Attributes.EMPTY) + .setAddresses(servers) + .setLoadBalancingPolicyConfig(lbConfigs.getConfig()) .build()); + + assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); + verify(testLbBalancer, times(2)).handleResolvedAddresses(resultCaptor.capture()); + assertThat(resultCaptor.getValue().getAddresses()).containsExactlyElementsIn(servers).inOrder(); } @Test @@ -392,267 +389,79 @@ public void handleResolvedAddressGroups_delegateAcceptsEmptyAddressList() } @Test - public void decideLoadBalancerProvider_noBalancerAddresses_noServiceConfig_pickFirst() - throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - PolicySelection policySelection = null; - List servers = - Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider) - .isInstanceOf(PickFirstLoadBalancerProvider.class); - assertThat(selection.serverList).isEqualTo(servers); - assertThat(selection.policySelection.config).isNull(); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_noBalancerAddresses_noServiceConfig_customDefault() - throws Exception { - AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory("test_lb") - .newLoadBalancer(new TestHelper()); - PolicySelection policySelection = null; - List servers = - Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider).isSameInstanceAs(testLbBalancerProvider); - assertThat(selection.serverList).isEqualTo(servers); - assertThat(selection.policySelection.config).isNull(); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_oneBalancer_noServiceConfig_grpclb() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - PolicySelection policySelection = null; - List servers = - Collections.singletonList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class); - assertThat(selection.serverList).isEqualTo(servers); - assertThat(selection.policySelection.config).isNull(); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_serviceConfigLbPolicy() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - Map rawServiceConfig = - parseConfig("{\"loadBalancingPolicy\": \"round_robin\"}"); - - ConfigOrError lbConfig = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); - assertThat(lbConfig.getConfig()).isNotNull(); - PolicySelection policySelection = (PolicySelection) lbConfig.getConfig(); - List servers = - Arrays.asList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()), - new EquivalentAddressGroup( - new SocketAddress(){})); - List backends = Arrays.asList(servers.get(1)); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider.getClass().getName()).isEqualTo( - "io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider"); - assertThat(selection.serverList).isEqualTo(backends); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_serviceConfigLbConfig() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); + public void handleResolvedAddressGroups_useSelectedLbPolicy() throws Exception { Map rawServiceConfig = parseConfig("{\"loadBalancingConfig\": [{\"round_robin\": {}}]}"); + ConfigOrError lbConfigs = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); + assertThat(lbConfigs.getConfig()).isNotNull(); + assertThat(((PolicySelection) lbConfigs.getConfig()).provider.getClass().getName()) + .isEqualTo("io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider"); - ConfigOrError lbConfig = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); - assertThat(lbConfig.getConfig()).isNotNull(); - PolicySelection policySelection = (PolicySelection) lbConfig.getConfig(); - List servers = - Arrays.asList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()), - new EquivalentAddressGroup( - new SocketAddress(){})); - List backends = Arrays.asList(servers.get(1)); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider.getClass().getName()).isEqualTo( - "io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider"); - assertThat(selection.serverList).isEqualTo(backends); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_grpclbConfigPropagated() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - Map rawServiceConfig = - parseConfig( - "{\"loadBalancingConfig\": [" - + "{\"grpclb\": {\"childPolicy\": [ {\"pick_first\": {} } ] } }" - + "] }"); - ConfigOrError lbConfig = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); - assertThat(lbConfig.getConfig()).isNotNull(); - PolicySelection policySelection = (PolicySelection) lbConfig.getConfig(); - - List servers = - Collections.singletonList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class); - assertThat(selection.serverList).isEqualTo(servers); - assertThat(selection.policySelection.config) - .isEqualTo(((PolicySelection) lbConfig.getConfig()).config); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_policyUnavailButGrpclbAddressPresent() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - - List servers = - Collections.singletonList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, null); - - assertThat(selection.policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class); - assertThat(selection.serverList).isEqualTo(servers); - assertThat(selection.policySelection.config).isNull(); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_grpclbProviderNotFound_fallbackToRoundRobin() - throws Exception { - LoadBalancerRegistry registry = new LoadBalancerRegistry(); - registry.register(new PickFirstLoadBalancerProvider()); - LoadBalancerProvider fakeRoundRobinProvider = - new FakeLoadBalancerProvider("round_robin", testLbBalancer, null); - registry.register(fakeRoundRobinProvider); - AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory( - registry, GrpcUtil.DEFAULT_LB_POLICY).newLoadBalancer(new TestHelper()); - List servers = - Arrays.asList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()), - new EquivalentAddressGroup(new SocketAddress(){})); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, null); - - assertThat(selection.policySelection.provider).isSameInstanceAs(fakeRoundRobinProvider); - assertThat(selection.policySelection.config).isNull(); - verify(channelLogger).log( - eq(ChannelLogLevel.ERROR), - startsWith("Found balancer addresses but grpclb runtime is missing")); - - // Called for the second time, the warning is only logged once - selection = lb.resolveLoadBalancerProvider(servers, null); - - assertThat(selection.policySelection.provider).isSameInstanceAs(fakeRoundRobinProvider); - assertThat(selection.policySelection.config).isNull(); - // Balancer addresses are filtered out in the server list passed to round_robin - assertThat(selection.serverList).containsExactly(servers.get(1)); - verifyNoMoreInteractions(channelLogger);; + final List servers = + Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); + Helper helper = new TestHelper() { + @Override + public Subchannel createSubchannel(CreateSubchannelArgs args) { + assertThat(args.getAddresses()).isEqualTo(servers); + return new TestSubchannel(args); + } + }; + AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(helper); + Status handleResult = lb.tryHandleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers) + .setLoadBalancingPolicyConfig(lbConfigs.getConfig()) + .build()); + assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); + assertThat(lb.getDelegate().getClass().getName()) + .isEqualTo("io.grpc.util.RoundRobinLoadBalancer"); } @Test - public void decideLoadBalancerProvider_grpclbProviderNotFound_noBackendAddress() - throws Exception { - LoadBalancerRegistry registry = new LoadBalancerRegistry(); - registry.register(new PickFirstLoadBalancerProvider()); - registry.register(new FakeLoadBalancerProvider("round_robin", testLbBalancer, null)); - AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory( - registry, GrpcUtil.DEFAULT_LB_POLICY).newLoadBalancer(new TestHelper()); - List servers = - Collections.singletonList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); - try { - lb.resolveLoadBalancerProvider(servers, null); - fail("Should throw"); - } catch (PolicyException e) { - assertThat(e) - .hasMessageThat() - .isEqualTo("Received ONLY balancer addresses but grpclb runtime is missing"); - } + public void handleResolvedAddressGroups_noLbPolicySelected_defaultToPickFirst() { + final List servers = + Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); + Helper helper = new TestHelper() { + @Override + public Subchannel createSubchannel(CreateSubchannelArgs args) { + assertThat(args.getAddresses()).isEqualTo(servers); + return new TestSubchannel(args); + } + }; + AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(helper); + Status handleResult = lb.tryHandleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers) + .setLoadBalancingPolicyConfig(null) + .build()); + assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); + assertThat(lb.getDelegate()).isInstanceOf(PickFirstLoadBalancer.class); } @Test - public void decideLoadBalancerProvider_serviceConfigLbConfigOverridesDefault() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - Map rawServiceConfig = - parseConfig("{\"loadBalancingConfig\": [ {\"round_robin\": {} } ] }"); - ConfigOrError lbConfigs = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); - assertThat(lbConfigs.getConfig()).isNotNull(); - PolicySelection policySelection = (PolicySelection) lbConfigs.getConfig(); + public void handleResolvedAddressGroups_noLbPolicySelected_defaultToCustomDefault() { + AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory("test_lb") + .newLoadBalancer(new TestHelper()); List servers = Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); - - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider.getClass().getName()).isEqualTo( - "io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider"); - verifyZeroInteractions(channelLogger); + Status handleResult = lb.tryHandleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers) + .setLoadBalancingPolicyConfig(null) + .build()); + assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); + assertThat(lb.getDelegate()).isSameInstanceAs(testLbBalancer); } @Test public void channelTracing_lbPolicyChanged() throws Exception { - final FakeClock clock = new FakeClock(); List servers = Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); Helper helper = new TestHelper() { - @Override - @Deprecated - public Subchannel createSubchannel(List addrs, Attributes attrs) { - return new TestSubchannel(CreateSubchannelArgs.newBuilder() - .setAddresses(addrs) - .setAttributes(attrs) - .build()); - } - @Override public Subchannel createSubchannel(CreateSubchannelArgs args) { return new TestSubchannel(args); } - - @Override - public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { - return mock(ManagedChannel.class, RETURNS_DEEP_STUBS); - } - - @Override - public String getAuthority() { - return "fake_authority"; - } - - @Override - public SynchronizationContext getSynchronizationContext() { - return new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - throw new AssertionError(e); - } - }); - } - - @Override - public ScheduledExecutorService getScheduledExecutorService() { - return clock.getScheduledExecutorService(); - } }; AutoConfiguredLoadBalancer lb = @@ -705,23 +514,6 @@ public ScheduledExecutorService getScheduledExecutorService() { eq("Load-balancing config: {0}"), eq(testLbParsedConfig.getConfig())); verifyNoMoreInteractions(channelLogger); - - servers = Collections.singletonList(new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); - handleResult = lb.tryHandleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(servers) - .setAttributes(Attributes.EMPTY) - .build()); - - assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); - verify(channelLogger).log( - eq(ChannelLogLevel.INFO), - eq("Load balancer changed from {0} to {1}"), - eq(testLbBalancer.getClass().getSimpleName()), eq("GrpclbLoadBalancer")); - - verifyNoMoreInteractions(channelLogger); } @Test @@ -834,6 +626,21 @@ public void parseLoadBalancerConfig_someProvidesAreNotAvailable() throws Excepti eq(new ArrayList<>(Collections.singletonList("magic_balancer")))); } + @Test + public void parseLoadBalancerConfig_lbConfigPropagated() throws Exception { + Map rawServiceConfig = + parseConfig( + "{\"loadBalancingConfig\": [" + + "{\"grpclb\": {\"childPolicy\": [ {\"pick_first\": {} } ] } }" + + "] }"); + ConfigOrError parsed = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); + assertThat(parsed).isNotNull(); + assertThat(parsed.getConfig()).isNotNull(); + PolicySelection policySelection = (PolicySelection) parsed.getConfig(); + assertThat(policySelection.config).isNotNull(); + assertThat(policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class); + verifyZeroInteractions(channelLogger); + } public static class ForwardingLoadBalancer extends LoadBalancer { private final LoadBalancer delegate; diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java index 192d03d3341..bfe97e21982 100644 --- a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java +++ b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java @@ -36,6 +36,7 @@ import com.google.common.collect.Iterables; import com.google.common.net.InetAddresses; import com.google.common.testing.FakeTicker; +import io.grpc.Attributes; import io.grpc.ChannelLogger; import io.grpc.EquivalentAddressGroup; import io.grpc.HttpConnectProxiedSocketAddress; @@ -154,7 +155,8 @@ private DnsNameResolver newResolver(String name, int defaultPort) { private DnsNameResolver newResolver(String name, int defaultPort, boolean isAndroid) { return newResolver( - name, defaultPort, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(), isAndroid); + name, defaultPort, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(), + isAndroid, false); } private DnsNameResolver newResolver( @@ -162,7 +164,7 @@ private DnsNameResolver newResolver( int defaultPort, ProxyDetector proxyDetector, Stopwatch stopwatch) { - return newResolver(name, defaultPort, proxyDetector, stopwatch, false); + return newResolver(name, defaultPort, proxyDetector, stopwatch, false, false); } private DnsNameResolver newResolver( @@ -170,7 +172,8 @@ private DnsNameResolver newResolver( final int defaultPort, final ProxyDetector proxyDetector, Stopwatch stopwatch, - boolean isAndroid) { + boolean isAndroid, + boolean enableSrv) { NameResolver.Args args = NameResolver.Args.newBuilder() .setDefaultPort(defaultPort) @@ -179,19 +182,34 @@ private DnsNameResolver newResolver( .setServiceConfigParser(mock(ServiceConfigParser.class)) .setChannelLogger(mock(ChannelLogger.class)) .build(); - return newResolver(name, stopwatch, isAndroid, args); + return newResolver(name, stopwatch, isAndroid, args, enableSrv); } private DnsNameResolver newResolver( String name, Stopwatch stopwatch, boolean isAndroid, NameResolver.Args args) { + return newResolver(name, stopwatch, isAndroid, args, /* enableSrv= */ false); + } + + private DnsNameResolver newResolver( + String name, + Stopwatch stopwatch, + boolean isAndroid, + NameResolver.Args args, + boolean enableSrv) { DnsNameResolver dnsResolver = new DnsNameResolver( - null, name, args, fakeExecutorResource, stopwatch, isAndroid, /* enableSrv= */ false); + null, name, args, fakeExecutorResource, stopwatch, isAndroid, enableSrv); // By default, using the mocked ResourceResolver to avoid I/O dnsResolver.setResourceResolver(new JndiResourceResolver(recordFetcher)); return dnsResolver; } + private DnsNameResolver newSrvEnabledResolver(String name, int defaultPort) { + return newResolver( + name, defaultPort, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(), + false, true); + } + @Before public void setUp() { DnsNameResolver.enableJndi = true; @@ -531,6 +549,41 @@ private void resolveDefaultValue() throws Exception { verify(mockResolver, times(2)).resolveAddress(anyString()); } + @Test + public void resolve_balancerAddrsAsAttributes() throws Exception { + InetAddress backendAddr = InetAddress.getByAddress(new byte[] {127, 0, 0, 0}); + final EquivalentAddressGroup balancerAddr = + new EquivalentAddressGroup( + new SocketAddress() {}, + Attributes.newBuilder() + .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "foo.example.com") + .build()); + String name = "foo.googleapis.com"; + + AddressResolver mockAddressResolver = mock(AddressResolver.class); + when(mockAddressResolver.resolveAddress(anyString())) + .thenReturn(Collections.singletonList(backendAddr)); + ResourceResolver mockResourceResolver = mock(ResourceResolver.class); + when(mockResourceResolver.resolveTxt(anyString())).thenReturn(Collections.emptyList()); + when(mockResourceResolver.resolveSrv(ArgumentMatchers.any(AddressResolver.class), anyString())) + .thenReturn(Collections.singletonList(balancerAddr)); + + DnsNameResolver resolver = newSrvEnabledResolver(name, 81); + resolver.setAddressResolver(mockAddressResolver); + resolver.setResourceResolver(mockResourceResolver); + + resolver.start(mockListener); + assertEquals(1, fakeExecutor.runDueTasks()); + verify(mockListener).onResult(resultCaptor.capture()); + ResolutionResult result = resultCaptor.getValue(); + InetSocketAddress resolvedBackendAddr = + (InetSocketAddress) Iterables.getOnlyElement( + Iterables.getOnlyElement(result.getAddresses()).getAddresses()); + assertThat(resolvedBackendAddr.getAddress()).isEqualTo(backendAddr); + assertThat(result.getAttributes().get(GrpcAttributes.ATTR_LB_ADDRS)) + .containsExactly(balancerAddr); + } + @Test public void resolveAll_nullResourceResolver() throws Exception { final String hostname = "addr.fake"; diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java index 65f4832f540..0c02e93884f 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java @@ -20,6 +20,7 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.ExperimentalApi; import io.grpc.Metadata; +import java.util.List; /** * Constants for the GRPCLB load-balancer. @@ -41,5 +42,13 @@ public final class GrpclbConstants { static final Attributes.Key TOKEN_ATTRIBUTE_KEY = Attributes.Key.create("lb-token"); + @EquivalentAddressGroup.Attr + static final Attributes.Key> ATTR_LB_ADDRS = + io.grpc.internal.GrpcAttributes.ATTR_LB_ADDRS; + + @EquivalentAddressGroup.Attr + static final Attributes.Key ATTR_LB_ADDR_AUTHORITY = + io.grpc.internal.GrpcAttributes.ATTR_LB_ADDR_AUTHORITY; + private GrpclbConstants() { } } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index cd539bdb6ac..782f423ed5c 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -91,22 +91,23 @@ public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo n @Override @SuppressWarnings("deprecation") // TODO(creamsoup) migrate to use parsed service config public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { - List updatedServers = resolvedAddresses.getAddresses(); Attributes attributes = resolvedAddresses.getAttributes(); - // LB addresses and backend addresses are treated separately List newLbAddressGroups = new ArrayList<>(); - List newBackendServers = new ArrayList<>(); - for (EquivalentAddressGroup server : updatedServers) { - String lbAddrAuthority = server.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY); - if (lbAddrAuthority != null) { - newLbAddressGroups.add(new LbAddressGroup(server, lbAddrAuthority)); - } else { - newBackendServers.add(server); + List newLbAddresses = attributes.get(GrpcAttributes.ATTR_LB_ADDRS); + if (newLbAddresses != null) { + for (EquivalentAddressGroup lbAddr : newLbAddresses) { + String lbAddrAuthority = lbAddr.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY); + if (lbAddrAuthority == null) { + throw new AssertionError( + "This is a bug: LB address " + lbAddr + " does not have an authority."); + } + newLbAddressGroups.add(new LbAddressGroup(lbAddr, lbAddrAuthority)); } } newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups); - newBackendServers = Collections.unmodifiableList(newBackendServers); + List newBackendServers = + Collections.unmodifiableList(resolvedAddresses.getAddresses()); Map rawLbConfigValue = attributes.get(ATTR_LOAD_BALANCING_CONFIG); Mode newMode = retrieveModeFromLbConfig(rawLbConfigValue, helper.getChannelLogger()); if (!mode.equals(newMode)) { @@ -184,6 +185,11 @@ public void handleNameResolutionError(Status error) { } } + @Override + public boolean canHandleEmptyAddressListFromNameResolution() { + return true; + } + @VisibleForTesting @Nullable GrpclbState getGrpclbState() { diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index a88b840d4d4..fe23b99138e 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -151,16 +151,16 @@ public boolean shouldAccept(Runnable command) { private SubchannelPool subchannelPool; private final ArrayList logs = new ArrayList<>(); private final ChannelLogger channelLogger = new ChannelLogger() { - @Override - public void log(ChannelLogLevel level, String msg) { - logs.add(level + ": " + msg); - } + @Override + public void log(ChannelLogLevel level, String msg) { + logs.add(level + ": " + msg); + } - @Override - public void log(ChannelLogLevel level, String template, Object... args) { - log(level, MessageFormat.format(template, args)); - } - }; + @Override + public void log(ChannelLogLevel level, String template, Object... args) { + log(level, MessageFormat.format(template, args)); + } + }; private SubchannelPicker currentPicker; private LoadBalancerGrpc.LoadBalancerImplBase mockLbService; @Captor @@ -207,12 +207,12 @@ public StreamObserver balanceLoad( StreamObserver requestObserver = mock(StreamObserver.class); Answer closeRpc = new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - responseObserver.onCompleted(); - return null; - } - }; + @Override + public Void answer(InvocationOnMock invocation) { + responseObserver.onCompleted(); + return null; + } + }; doAnswer(closeRpc).when(requestObserver).onCompleted(); lbRequestObservers.add(requestObserver); return requestObserver; @@ -221,63 +221,63 @@ public Void answer(InvocationOnMock invocation) { fakeLbServer = InProcessServerBuilder.forName("fakeLb") .directExecutor().addService(mockLbService).build().start(); doAnswer(new Answer() { - @Override - public ManagedChannel answer(InvocationOnMock invocation) throws Throwable { - String authority = (String) invocation.getArguments()[1]; - ManagedChannel channel; - if (failingLbAuthorities.contains(authority)) { - channel = InProcessChannelBuilder.forName("nonExistFakeLb").directExecutor() - .overrideAuthority(authority).build(); - } else { - channel = InProcessChannelBuilder.forName("fakeLb").directExecutor() - .overrideAuthority(authority).build(); - } - fakeOobChannels.add(channel); - oobChannelTracker.add(channel); - return channel; + @Override + public ManagedChannel answer(InvocationOnMock invocation) throws Throwable { + String authority = (String) invocation.getArguments()[1]; + ManagedChannel channel; + if (failingLbAuthorities.contains(authority)) { + channel = InProcessChannelBuilder.forName("nonExistFakeLb").directExecutor() + .overrideAuthority(authority).build(); + } else { + channel = InProcessChannelBuilder.forName("fakeLb").directExecutor() + .overrideAuthority(authority).build(); } - }).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class)); + fakeOobChannels.add(channel); + oobChannelTracker.add(channel); + return channel; + } + }).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class)); doAnswer(new Answer() { - @Override - public Subchannel answer(InvocationOnMock invocation) throws Throwable { - Subchannel subchannel = mock(Subchannel.class); - EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0]; - Attributes attrs = (Attributes) invocation.getArguments()[1]; - when(subchannel.getAllAddresses()).thenReturn(Arrays.asList(eag)); - when(subchannel.getAttributes()).thenReturn(attrs); - mockSubchannels.add(subchannel); - pooledSubchannelTracker.add(subchannel); - return subchannel; - } - }).when(subchannelPool).takeOrCreateSubchannel( - any(EquivalentAddressGroup.class), any(Attributes.class)); + @Override + public Subchannel answer(InvocationOnMock invocation) throws Throwable { + Subchannel subchannel = mock(Subchannel.class); + EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0]; + Attributes attrs = (Attributes) invocation.getArguments()[1]; + when(subchannel.getAllAddresses()).thenReturn(Arrays.asList(eag)); + when(subchannel.getAttributes()).thenReturn(attrs); + mockSubchannels.add(subchannel); + pooledSubchannelTracker.add(subchannel); + return subchannel; + } + }).when(subchannelPool).takeOrCreateSubchannel( + any(EquivalentAddressGroup.class), any(Attributes.class)); doAnswer(new Answer() { - @Override - public Subchannel answer(InvocationOnMock invocation) throws Throwable { - Subchannel subchannel = mock(Subchannel.class); - List eagList = - (List) invocation.getArguments()[0]; - Attributes attrs = (Attributes) invocation.getArguments()[1]; - when(subchannel.getAllAddresses()).thenReturn(eagList); - when(subchannel.getAttributes()).thenReturn(attrs); - mockSubchannels.add(subchannel); - unpooledSubchannelTracker.add(subchannel); - return subchannel; - } - // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to - // the new createSubchannel(). - }).when(helper).createSubchannel(any(List.class), any(Attributes.class)); + @Override + public Subchannel answer(InvocationOnMock invocation) throws Throwable { + Subchannel subchannel = mock(Subchannel.class); + List eagList = + (List) invocation.getArguments()[0]; + Attributes attrs = (Attributes) invocation.getArguments()[1]; + when(subchannel.getAllAddresses()).thenReturn(eagList); + when(subchannel.getAttributes()).thenReturn(attrs); + mockSubchannels.add(subchannel); + unpooledSubchannelTracker.add(subchannel); + return subchannel; + } + // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to + // the new createSubchannel(). + }).when(helper).createSubchannel(any(List.class), any(Attributes.class)); when(helper.getSynchronizationContext()).thenReturn(syncContext); when(helper.getScheduledExecutorService()).thenReturn(fakeClock.getScheduledExecutorService()); when(helper.getChannelLogger()).thenReturn(channelLogger); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - currentPicker = (SubchannelPicker) invocation.getArguments()[1]; - return null; - } - }).when(helper).updateBalancingState( - any(ConnectivityState.class), any(SubchannelPicker.class)); + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + currentPicker = (SubchannelPicker) invocation.getArguments()[1]; + return null; + } + }).when(helper).updateBalancingState( + any(ConnectivityState.class), any(SubchannelPicker.class)); when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY); when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L); @@ -293,11 +293,11 @@ public void tearDown() { try { if (balancer != null) { syncContext.execute(new Runnable() { - @Override - public void run() { - balancer.shutdown(); - } - }); + @Override + public void run() { + balancer.shutdown(); + } + }); } for (ManagedChannel channel : oobChannelTracker) { assertTrue(channel + " is shutdown", channel.isShutdown()); @@ -468,9 +468,11 @@ public void loadReporting() { when(args.getHeaders()).thenReturn(headers); long loadReportIntervalMillis = 1983; - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); // Fallback timer is started as soon as address is resolved. assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); @@ -485,7 +487,7 @@ public void loadReporting() { inOrder.verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -593,7 +595,7 @@ public void loadReporting() { .setLoadBalanceToken("token0003") .setNumCalls(1) // pick4 .build()) - .build()); + .build()); PickResult pick5 = picker.pickSubchannel(args); assertSame(subchannel1, pick1.getSubchannel()); @@ -641,7 +643,7 @@ public void loadReporting() { inOrder.verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Load reporting is also requested @@ -693,9 +695,11 @@ public void abundantInitialResponse() { PickSubchannelArgs args = mock(PickSubchannelArgs.class); when(args.getHeaders()).thenReturn(headers); - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); @@ -731,9 +735,11 @@ public void raceBetweenLoadReportingAndLbStreamClosure() { PickSubchannelArgs args = mock(PickSubchannelArgs.class); when(args.getHeaders()).thenReturn(headers); - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); @@ -743,7 +749,7 @@ public void raceBetweenLoadReportingAndLbStreamClosure() { inOrder.verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -780,8 +786,8 @@ private void assertNextReport( eq(LoadBalanceRequest.newBuilder() .setClientStats( ClientStats.newBuilder(expectedReport) - .setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read())) - .build()) + .setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read())) + .build()) .build())); } @@ -803,11 +809,12 @@ public void nameResolutionFailsThenRecover() { assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); // Recover with a subsequent success - List resolvedServers = createResolvedServerAddresses(true); - EquivalentAddressGroup eag = resolvedServers.get(0); + List grpclbBalancerList = createResolvedBalancerAddresses(1); + EquivalentAddressGroup eag = grpclbBalancerList.get(0); Attributes resolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(resolvedServers, resolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), grpclbBalancerList, resolutionAttrs); verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0))); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -817,11 +824,13 @@ public void nameResolutionFailsThenRecover() { public void grpclbThenNameResolutionFails() { InOrder inOrder = inOrder(helper, subchannelPool); // Go to GRPCLB first - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); - verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -854,59 +863,63 @@ public void grpclbThenNameResolutionFails() { @Test public void grpclbUpdatedAddresses_avoidsReconnect() { - List grpclbResolutionList = - createResolvedServerAddresses(true, false); + List backendList = createResolvedBackendAddresses(1); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs); - verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); ManagedChannel oobChannel = fakeOobChannels.poll(); assertEquals(1, lbRequestObservers.size()); - List grpclbResolutionList2 = - createResolvedServerAddresses(true, false, true); + List backendList2 = createResolvedBackendAddresses(1); + List grpclbBalancerList2 = createResolvedBalancerAddresses(2); EquivalentAddressGroup combinedEag = new EquivalentAddressGroup(Arrays.asList( - grpclbResolutionList2.get(0).getAddresses().get(0), - grpclbResolutionList2.get(2).getAddresses().get(0)), + grpclbBalancerList2.get(0).getAddresses().get(0), + grpclbBalancerList2.get(1).getAddresses().get(0)), lbAttributes(lbAuthority(0))); - deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList2, grpclbBalancerList2, grpclbResolutionAttrs); verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(combinedEag)); assertEquals(1, lbRequestObservers.size()); // No additional RPC } @Test public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() { - List grpclbResolutionList = - createResolvedServerAddresses(true, false); + List backendList = createResolvedBackendAddresses(1); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs); - verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); ManagedChannel oobChannel = fakeOobChannels.poll(); assertEquals(1, lbRequestObservers.size()); final String newAuthority = "some-new-authority"; - List grpclbResolutionList2 = - createResolvedServerAddresses(false); - grpclbResolutionList2.add(new EquivalentAddressGroup( - new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority))); - deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs); + List backendList2 = createResolvedBackendAddresses(1); + List grpclbBalancerList2 = + Collections.singletonList( + new EquivalentAddressGroup( + new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority))); + deliverResolvedAddresses( + backendList2, grpclbBalancerList2, grpclbResolutionAttrs); assertTrue(oobChannel.isTerminated()); - verify(helper).createOobChannel(eq(grpclbResolutionList2.get(1)), eq(newAuthority)); + verify(helper).createOobChannel(eq(grpclbBalancerList2.get(0)), eq(newAuthority)); assertEquals(2, lbRequestObservers.size()); // An additional RPC } @Test public void grpclbWorking() { InOrder inOrder = inOrder(helper, subchannelPool); - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); // Fallback timer is started as soon as the addresses are resolved. assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -915,7 +928,7 @@ public void grpclbWorking() { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -1174,13 +1187,14 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { long loadReportIntervalMillis = 1983; InOrder inOrder = inOrder(helper, subchannelPool); - // Create a resolution list with a mixture of balancer and backend addresses - List resolutionList = - createResolvedServerAddresses(false, true, false); + // Create balancer and backend addresses + List backendList = createResolvedBackendAddresses(2); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes resolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(resolutionList, resolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs); - inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); + inOrder.verify(helper) + .createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); // Attempted to connect to balancer assertEquals(1, fakeOobChannels.size()); @@ -1192,7 +1206,7 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // We don't care if these methods have been run. @@ -1214,11 +1228,11 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); List fallbackList = - Arrays.asList(resolutionList.get(0), resolutionList.get(2)); + Arrays.asList(backendList.get(0), backendList.get(1)); assertThat(logs).containsExactly( "INFO: Using fallback backends", "INFO: Using RR list=[[[FakeSocketAddress-fake-address-0]/{}], " - + "[[FakeSocketAddress-fake-address-2]/{}]], drop=[null, null]", + + "[[FakeSocketAddress-fake-address-1]/{}]], drop=[null, null]", "INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder(); // Fall back to the backends from resolver @@ -1228,20 +1242,21 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { verify(lbRequestObserver, never()).onCompleted(); } - //////////////////////////////////////////////////////// - // Name resolver sends new list without any backend addr - //////////////////////////////////////////////////////// - resolutionList = createResolvedServerAddresses(true, true); - deliverResolvedAddresses(resolutionList, resolutionAttrs); + ////////////////////////////////////////////////////////////////////// + // Name resolver sends new resolution results without any backend addr + ////////////////////////////////////////////////////////////////////// + grpclbBalancerList = createResolvedBalancerAddresses(2); + deliverResolvedAddresses( + Collections.emptyList(),grpclbBalancerList, resolutionAttrs); // New addresses are updated to the OobChannel inOrder.verify(helper).updateOobChannelAddresses( same(oobChannel), eq(new EquivalentAddressGroup( - Arrays.asList( - resolutionList.get(0).getAddresses().get(0), - resolutionList.get(1).getAddresses().get(0)), - lbAttributes(lbAuthority(0))))); + Arrays.asList( + grpclbBalancerList.get(0).getAddresses().get(0), + grpclbBalancerList.get(1).getAddresses().get(0)), + lbAttributes(lbAuthority(0))))); if (timerExpires) { // Still in fallback logic, except that the backend list is empty @@ -1249,21 +1264,22 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { inOrder, Collections.emptyList()); } - ////////////////////////////////////////////////// - // Name resolver sends new list with backend addrs - ////////////////////////////////////////////////// - resolutionList = createResolvedServerAddresses(true, false, false); - deliverResolvedAddresses(resolutionList, resolutionAttrs); + //////////////////////////////////////////////////////////////// + // Name resolver sends new resolution results with backend addrs + //////////////////////////////////////////////////////////////// + backendList = createResolvedBackendAddresses(2); + grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs); // New LB address is updated to the OobChannel inOrder.verify(helper).updateOobChannelAddresses( same(oobChannel), - eq(resolutionList.get(0))); + eq(grpclbBalancerList.get(0))); if (timerExpires) { // New backend addresses are used for fallback fallbackTestVerifyUseOfFallbackBackendLists( - inOrder, Arrays.asList(resolutionList.get(1), resolutionList.get(2))); + inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); } //////////////////////////////////////////////// @@ -1283,7 +1299,7 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); } @@ -1302,8 +1318,9 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { /////////////////////////////////////////////////////////////// // New backend addresses from resolver outside of fallback mode /////////////////////////////////////////////////////////////// - resolutionList = createResolvedServerAddresses(true, false); - deliverResolvedAddresses(resolutionList, resolutionAttrs); + backendList = createResolvedBackendAddresses(1); + grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs); // Will not affect the round robin list at all inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); @@ -1317,13 +1334,13 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { long loadReportIntervalMillis = 1983; InOrder inOrder = inOrder(helper, subchannelPool); - // Create a resolution list with a mixture of balancer and backend addresses - List resolutionList = - createResolvedServerAddresses(false, true, false); + // Create balancer and backend addresses + List backendList = createResolvedBackendAddresses(2); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes resolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(resolutionList, resolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs); - inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); + inOrder.verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); // Attempted to connect to balancer assertThat(fakeOobChannels).hasSize(1); @@ -1334,7 +1351,7 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // We don't care if these methods have been run. @@ -1353,7 +1370,7 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { // Fall back to the backends from resolver fallbackTestVerifyUseOfFallbackBackendLists( - inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); + inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); // A new stream is created verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); @@ -1361,7 +1378,7 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); } @@ -1369,19 +1386,20 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { public void grpclbFallback_noBalancerAddress() { InOrder inOrder = inOrder(helper, subchannelPool); - // Create a resolution list with just backend addresses - List resolutionList = createResolvedServerAddresses(false, false); + // Create just backend addresses + List backendList = createResolvedBackendAddresses(2); Attributes resolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(resolutionList, resolutionAttrs); + deliverResolvedAddresses( + backendList, Collections.emptyList(), resolutionAttrs); assertThat(logs).containsExactly( - "INFO: Using fallback backends", - "INFO: Using RR list=[[[FakeSocketAddress-fake-address-0]/{}], " - + "[[FakeSocketAddress-fake-address-1]/{}]], drop=[null, null]", - "INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder(); + "INFO: Using fallback backends", + "INFO: Using RR list=[[[FakeSocketAddress-fake-address-0]/{}], " + + "[[FakeSocketAddress-fake-address-1]/{}]], drop=[null, null]", + "INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder(); // Fall back to the backends from resolver - fallbackTestVerifyUseOfFallbackBackendLists(inOrder, resolutionList); + fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList); // No fallback timeout timer scheduled. assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); @@ -1410,13 +1428,13 @@ private void subtestGrpclbFallbackConnectionLost( long loadReportIntervalMillis = 1983; InOrder inOrder = inOrder(helper, mockLbService, subchannelPool); - // Create a resolution list with a mixture of balancer and backend addresses - List resolutionList = - createResolvedServerAddresses(false, true, false); + // Create balancer and backend addresses + List backendList = createResolvedBackendAddresses(2); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes resolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(resolutionList, resolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs); - inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); + inOrder.verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); // Attempted to connect to balancer assertEquals(1, fakeOobChannels.size()); @@ -1428,7 +1446,7 @@ private void subtestGrpclbFallbackConnectionLost( verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // We don't care if these methods have been run. @@ -1465,7 +1483,7 @@ private void subtestGrpclbFallbackConnectionLost( if (balancerBroken && allSubchannelsBroken) { // Going into fallback subchannels = fallbackTestVerifyUseOfFallbackBackendLists( - inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); + inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); // When in fallback mode, fallback timer should not be scheduled when all backend // connections are lost @@ -1486,9 +1504,9 @@ private void subtestGrpclbFallbackConnectionLost( if (!(balancerBroken && allSubchannelsBroken)) { verify(subchannelPool, never()).takeOrCreateSubchannel( - eq(resolutionList.get(0)), any(Attributes.class)); + eq(backendList.get(0)), any(Attributes.class)); verify(subchannelPool, never()).takeOrCreateSubchannel( - eq(resolutionList.get(2)), any(Attributes.class)); + eq(backendList.get(1)), any(Attributes.class)); } } @@ -1555,15 +1573,15 @@ private List fallbackTestVerifyUseOfBackendLists( @Test public void grpclbMultipleAuthorities() throws Exception { - List grpclbResolutionList = Arrays.asList( + List backendList = Collections.singletonList( + new EquivalentAddressGroup(new FakeSocketAddress("not-a-lb-address"))); + List grpclbBalancerList = Arrays.asList( new EquivalentAddressGroup( new FakeSocketAddress("fake-address-1"), lbAttributes("fake-authority-1")), new EquivalentAddressGroup( new FakeSocketAddress("fake-address-2"), lbAttributes("fake-authority-2")), - new EquivalentAddressGroup( - new FakeSocketAddress("not-a-lb-address")), new EquivalentAddressGroup( new FakeSocketAddress("fake-address-3"), lbAttributes("fake-authority-1"))); @@ -1574,7 +1592,7 @@ public void grpclbMultipleAuthorities() throws Exception { lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs); verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1"); } @@ -1588,9 +1606,11 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { .build(); InOrder inOrder = inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2, helper); - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); @SuppressWarnings("unused") @@ -1693,11 +1713,13 @@ public void grpclbWorking_pickFirstMode() throws Exception { InOrder inOrder = inOrder(helper); String lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}"; - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.newBuilder().set( LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -1706,7 +1728,7 @@ public void grpclbWorking_pickFirstMode() throws Exception { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -1722,8 +1744,8 @@ public void grpclbWorking_pickFirstMode() throws Exception { // the new createSubchannel(). inOrder.verify(helper).createSubchannel( eq(Arrays.asList( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))), + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))), any(Attributes.class)); // Initially IDLE @@ -1779,9 +1801,9 @@ public void grpclbWorking_pickFirstMode() throws Exception { assertThat(mockSubchannels).isEmpty(); verify(subchannel).updateAddresses( eq(Arrays.asList( - new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends2.get(2).addr, - eagAttrsWithToken("token0004"))))); + new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends2.get(2).addr, + eagAttrsWithToken("token0004"))))); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker4.dropList).containsExactly( @@ -1825,10 +1847,12 @@ public void shutdownWithoutSubchannel_pickFirst() throws Exception { @SuppressWarnings("deprecation") // TODO(creamsoup) use parsed object private void subtestShutdownWithoutSubchannel(String childPolicy) throws Exception { String lbConfig = "{\"childPolicy\" : [ {\"" + childPolicy + "\" : {}} ]}"; - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.newBuilder().set( LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); assertEquals(1, lbRequestObservers.size()); StreamObserver requestObserver = lbRequestObservers.poll(); @@ -1848,12 +1872,12 @@ public void pickFirstMode_fallback() throws Exception { String lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}"; - // Name resolver returns a mix of balancer and backend addresses - List grpclbResolutionList = - createResolvedServerAddresses(false, true, false); + // Name resolver returns balancer and backend addresses + List backendList = createResolvedBackendAddresses(2); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.newBuilder().set( LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs); // Attempted to connect to balancer assertEquals(1, fakeOobChannels.size()); @@ -1868,7 +1892,7 @@ public void pickFirstMode_fallback() throws Exception { // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to // the new createSubchannel(). inOrder.verify(helper).createSubchannel( - eq(Arrays.asList(grpclbResolutionList.get(0), grpclbResolutionList.get(2))), + eq(Arrays.asList(backendList.get(0), backendList.get(1))), any(Attributes.class)); assertThat(mockSubchannels).hasSize(1); @@ -1905,9 +1929,9 @@ public void pickFirstMode_fallback() throws Exception { assertThat(mockSubchannels).isEmpty(); verify(subchannel).updateAddresses( eq(Arrays.asList( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, - eagAttrsWithToken("token0002"))))); + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, + eagAttrsWithToken("token0002"))))); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker2.dropList).containsExactly(null, null); @@ -1927,11 +1951,13 @@ public void switchMode() throws Exception { InOrder inOrder = inOrder(helper); String lbConfig = "{\"childPolicy\" : [ {\"round_robin\" : {}} ]}"; - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.newBuilder().set( LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); @@ -1941,7 +1967,7 @@ public void switchMode() throws Exception { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -1971,7 +1997,9 @@ public void switchMode() throws Exception { lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}"; grpclbResolutionAttrs = Attributes.newBuilder().set( LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); // GrpclbState will be shutdown, and a new one will be created @@ -1989,7 +2017,7 @@ public void switchMode() throws Exception { lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -2003,8 +2031,8 @@ public void switchMode() throws Exception { // the new createSubchannel(). inOrder.verify(helper).createSubchannel( eq(Arrays.asList( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))), + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))), any(Attributes.class)); inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -2088,19 +2116,18 @@ public void retrieveModeFromLbConfig_badConfigDefaultToRoundRobin() throws Excep @Test public void grpclbWorking_lbSendsFallbackMessage() { InOrder inOrder = inOrder(helper, subchannelPool); - List grpclbResolutionList = - createResolvedServerAddresses(true, true, false, false); - List fallbackEags = grpclbResolutionList.subList(2, 4); + List backendList = createResolvedBackendAddresses(2); + List grpclbBalancerList = createResolvedBalancerAddresses(2); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs); // Fallback timer is started as soon as the addresses are resolved. assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); List addrs = new ArrayList<>(); - addrs.addAll(grpclbResolutionList.get(0).getAddresses()); - addrs.addAll(grpclbResolutionList.get(1).getAddresses()); - Attributes attr = grpclbResolutionList.get(0).getAttributes(); + addrs.addAll(grpclbBalancerList.get(0).getAddresses()); + addrs.addAll(grpclbBalancerList.get(1).getAddresses()); + Attributes attr = grpclbBalancerList.get(0).getAttributes(); EquivalentAddressGroup oobChannelEag = new EquivalentAddressGroup(addrs, attr); verify(helper).createOobChannel(eq(oobChannelEag), eq(lbAuthority(0))); assertEquals(1, fakeOobChannels.size()); @@ -2206,7 +2233,7 @@ public void grpclbWorking_lbSendsFallbackMessage() { .returnSubchannel(eq(subchannel2), eq(ConnectivityStateInfo.forNonError(READY))); // verify fallback - fallbackTestVerifyUseOfFallbackBackendLists(inOrder, fallbackEags); + fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList); assertFalse(oobChannel.isShutdown()); verify(lbRequestObserver, never()).onCompleted(); @@ -2293,48 +2320,60 @@ public void grpclbWorking_lbSendsFallbackMessage() { private void deliverSubchannelState( final Subchannel subchannel, final ConnectivityStateInfo newState) { syncContext.execute(new Runnable() { - @Override - public void run() { - // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to - // the new API. - balancer.handleSubchannelState(subchannel, newState); - } - }); + @Override + public void run() { + // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to + // the new API. + balancer.handleSubchannelState(subchannel, newState); + } + }); } private void deliverNameResolutionError(final Status error) { syncContext.execute(new Runnable() { - @Override - public void run() { - balancer.handleNameResolutionError(error); - } - }); + @Override + public void run() { + balancer.handleNameResolutionError(error); + } + }); } private void deliverResolvedAddresses( - final List addrs, final Attributes attrs) { + final List backendAddrs, + final List balancerAddrs, + Attributes attrs) { + final Attributes attributes = + attrs.toBuilder().set(GrpclbConstants.ATTR_LB_ADDRS, balancerAddrs).build(); syncContext.execute(new Runnable() { - @Override - public void run() { - balancer.handleResolvedAddresses( - ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(attrs).build()); - } - }); + @Override + public void run() { + balancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(backendAddrs) + .setAttributes(attributes) + .build()); + } + }); } private GrpclbClientLoadRecorder getLoadRecorder() { return balancer.getGrpclbState().getLoadRecorder(); } - private static List createResolvedServerAddresses(boolean ... isLb) { - ArrayList list = new ArrayList<>(); - for (int i = 0; i < isLb.length; i++) { + private static List createResolvedBackendAddresses(int n) { + List list = new ArrayList<>(); + for (int i = 0; i < n; i++) { + SocketAddress addr = new FakeSocketAddress("fake-address-" + i); + list.add(new EquivalentAddressGroup(addr)); + } + return list; + } + + private static List createResolvedBalancerAddresses(int n) { + List list = new ArrayList<>(); + for (int i = 0; i < n; i++) { SocketAddress addr = new FakeSocketAddress("fake-address-" + i); - EquivalentAddressGroup eag = - new EquivalentAddressGroup( - addr, - isLb[i] ? lbAttributes(lbAuthority(i)) : Attributes.EMPTY); - list.add(eag); + list.add(new EquivalentAddressGroup(addr, lbAttributes(lbAuthority(i)))); } return list; } From c817b6f38f3e6079a834ef830642450512777792 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Mon, 27 Jan 2020 10:03:06 -0800 Subject: [PATCH 2/7] Add annotation for LB addrs attributes. --- core/src/main/java/io/grpc/internal/GrpcAttributes.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/io/grpc/internal/GrpcAttributes.java b/core/src/main/java/io/grpc/internal/GrpcAttributes.java index 5f4618d9ea8..2ff4e7cff99 100644 --- a/core/src/main/java/io/grpc/internal/GrpcAttributes.java +++ b/core/src/main/java/io/grpc/internal/GrpcAttributes.java @@ -38,6 +38,7 @@ public final class GrpcAttributes { public static final Attributes.Key> NAME_RESOLVER_SERVICE_CONFIG = Attributes.Key.create("service-config"); + @NameResolver.ResolutionResultAttr public static final Attributes.Key> ATTR_LB_ADDRS = Attributes.Key.create("io.grpc.grpclb.lbAddrs"); From 3fccafe4b573c1fc68fdb0422e48f2d704bbb086 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 28 Jan 2020 01:33:28 -0800 Subject: [PATCH 3/7] Resolver can return empty result and let LB policy handle the error of no usable addresses. --- .../io/grpc/internal/DnsNameResolver.java | 19 +++---- .../io/grpc/internal/DnsNameResolverTest.java | 54 +++++++++++-------- .../io/grpc/grpclb/GrpclbLoadBalancer.java | 9 +++- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 14 +++++ 4 files changed, 62 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java index 150f962738d..76f82ed1609 100644 --- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java +++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java @@ -293,24 +293,19 @@ public void run() { Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e)); return; } - if (resolutionResults.addresses.isEmpty() && resolutionResults.balancerAddresses.isEmpty()) { - savedListener.onError(Status.UNAVAILABLE.withDescription( - "No DNS backend or balancer addresses found for " + host)); - return; - } // Each address forms an EAG List servers = new ArrayList<>(); for (InetAddress inetAddr : resolutionResults.addresses) { servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, port))); } - ResolutionResult.Builder resultBuilder = - ResolutionResult.newBuilder() - .setAddresses(servers) - .setAttributes( - Attributes.newBuilder() - .set(GrpcAttributes.ATTR_LB_ADDRS, resolutionResults.balancerAddresses) - .build()); + ResolutionResult.Builder resultBuilder = ResolutionResult.newBuilder().setAddresses(servers); + if (!resolutionResults.balancerAddresses.isEmpty()) { + resultBuilder.setAttributes( + Attributes.newBuilder() + .set(GrpcAttributes.ATTR_LB_ADDRS, resolutionResults.balancerAddresses) + .build()); + } if (!resolutionResults.txtRecords.isEmpty()) { ConfigOrError rawServiceConfig = parseServiceConfig(resolutionResults.txtRecords, random, getLocalHostname()); diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java index bfe97e21982..0d3b26ed180 100644 --- a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java +++ b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java @@ -47,7 +47,6 @@ import io.grpc.ProxyDetector; import io.grpc.StaticTestingClassLoader; import io.grpc.Status; -import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.internal.DnsNameResolver.AddressResolver; import io.grpc.internal.DnsNameResolver.ResolutionResults; @@ -381,26 +380,6 @@ public void execute(Runnable command) { assertThat(executions.get()).isEqualTo(1); } - @Test - public void resolveAll_failsOnEmptyResult() { - DnsNameResolver nr = newResolver("dns:///addr.fake:1234", 443); - nr.setAddressResolver(new AddressResolver() { - @Override - public List resolveAddress(String host) throws Exception { - return Collections.emptyList(); - } - }); - - nr.start(mockListener); - assertThat(fakeExecutor.runDueTasks()).isEqualTo(1); - - ArgumentCaptor ac = ArgumentCaptor.forClass(Status.class); - verify(mockListener).onError(ac.capture()); - verifyNoMoreInteractions(mockListener); - assertThat(ac.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(ac.getValue().getDescription()).contains("No DNS backend or balancer addresses"); - } - @Test public void resolve_cacheForever() throws Exception { System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, "-1"); @@ -549,6 +528,39 @@ private void resolveDefaultValue() throws Exception { verify(mockResolver, times(2)).resolveAddress(anyString()); } + @Test + public void resolve_emptyResult() { + DnsNameResolver nr = newResolver("dns:///addr.fake:1234", 443); + nr.setAddressResolver(new AddressResolver() { + @Override + public List resolveAddress(String host) throws Exception { + return Collections.emptyList(); + } + }); + nr.setResourceResolver(new ResourceResolver() { + @Override + public List resolveTxt(String host) throws Exception { + return Collections.emptyList(); + } + + @Override + public List resolveSrv(AddressResolver addressResolver, String host) + throws Exception { + return Collections.emptyList(); + } + }); + + nr.start(mockListener); + assertThat(fakeExecutor.runDueTasks()).isEqualTo(1); + + ArgumentCaptor ac = ArgumentCaptor.forClass(ResolutionResult.class); + verify(mockListener).onResult(ac.capture()); + verifyNoMoreInteractions(mockListener); + assertThat(ac.getValue().getAddresses()).isEmpty(); + assertThat(ac.getValue().getAttributes()).isEqualTo(Attributes.EMPTY); + assertThat(ac.getValue().getServiceConfig()).isNull(); + } + @Test public void resolve_balancerAddrsAsAttributes() throws Exception { InetAddress backendAddr = InetAddress.getByAddress(new byte[] {127, 0, 0, 0}); diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index 782f423ed5c..df9282e32bc 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -92,8 +92,15 @@ public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo n @SuppressWarnings("deprecation") // TODO(creamsoup) migrate to use parsed service config public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { Attributes attributes = resolvedAddresses.getAttributes(); - List newLbAddressGroups = new ArrayList<>(); List newLbAddresses = attributes.get(GrpcAttributes.ATTR_LB_ADDRS); + if ((newLbAddresses == null || newLbAddresses.isEmpty()) + && resolvedAddresses.getAddresses().isEmpty()) { + handleNameResolutionError( + Status.UNAVAILABLE.withDescription("No backend or balancer addresses found")); + return; + } + List newLbAddressGroups = new ArrayList<>(); + if (newLbAddresses != null) { for (EquivalentAddressGroup lbAddr : newLbAddresses) { String lbAddrAuthority = lbAddr.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY); diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index ebc8c774be9..21ea066b2fe 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -791,6 +791,20 @@ private void assertNextReport( .build())); } + @Test + public void receiveNoBackendAndBalancerAddress() { + deliverResolvedAddresses( + Collections.emptyList(), + Collections.emptyList(), + Attributes.EMPTY); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList).isEmpty(); + Status error = Iterables.getOnlyElement(picker.pickList).picked(new Metadata()).getStatus(); + assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(error.getDescription()).isEqualTo("No backend or balancer addresses found"); + } + @Test public void nameResolutionFailsThenRecover() { Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); From f02a7fab6dc898c853168bc414afb861d6a43f89 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 28 Jan 2020 10:05:11 -0800 Subject: [PATCH 4/7] Do not set ATTR_LB_ADDRS for test if resolved addresses do not contain balancer addresses, this mirrors the real behavior of DnsNameResolver. --- .../test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 21ea066b2fe..4e0f1063937 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -2356,15 +2356,17 @@ private void deliverResolvedAddresses( final List backendAddrs, final List balancerAddrs, Attributes attrs) { - final Attributes attributes = - attrs.toBuilder().set(GrpclbConstants.ATTR_LB_ADDRS, balancerAddrs).build(); + if (!balancerAddrs.isEmpty()) { + attrs = attrs.toBuilder().set(GrpclbConstants.ATTR_LB_ADDRS, balancerAddrs).build(); + } + final Attributes finalAttrs = attrs; syncContext.execute(new Runnable() { @Override public void run() { balancer.handleResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(backendAddrs) - .setAttributes(attributes) + .setAttributes(finalAttrs) .build()); } }); From 19d2d7c0100a4f0d8e8fb90900d4e931294bd63a Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 28 Jan 2020 10:06:37 -0800 Subject: [PATCH 5/7] Fix attributes being overwritten. --- .../java/io/grpc/internal/DnsNameResolver.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java index 76f82ed1609..9a6224d4ac7 100644 --- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java +++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java @@ -300,11 +300,9 @@ public void run() { } ResolutionResult.Builder resultBuilder = ResolutionResult.newBuilder().setAddresses(servers); + Attributes.Builder attributesBuilder = Attributes.newBuilder(); if (!resolutionResults.balancerAddresses.isEmpty()) { - resultBuilder.setAttributes( - Attributes.newBuilder() - .set(GrpcAttributes.ATTR_LB_ADDRS, resolutionResults.balancerAddresses) - .build()); + attributesBuilder.set(GrpcAttributes.ATTR_LB_ADDRS, resolutionResults.balancerAddresses); } if (!resolutionResults.txtRecords.isEmpty()) { ConfigOrError rawServiceConfig = @@ -319,17 +317,14 @@ public void run() { Map verifiedRawServiceConfig = (Map) rawServiceConfig.getConfig(); ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(verifiedRawServiceConfig); - resultBuilder - .setAttributes( - Attributes.newBuilder() - .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, verifiedRawServiceConfig) - .build()) - .setServiceConfig(parsedServiceConfig); + resultBuilder.setServiceConfig(parsedServiceConfig); + attributesBuilder + .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, verifiedRawServiceConfig); } } else { logger.log(Level.FINE, "No TXT records found for {0}", new Object[]{host}); } - savedListener.onResult(resultBuilder.build()); + savedListener.onResult(resultBuilder.setAttributes(attributesBuilder.build()).build()); } } From 5adefa9b5760870406d41bf4d9beedde11819a9a Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 28 Jan 2020 10:07:05 -0800 Subject: [PATCH 6/7] Delete no longer used class. --- .../AutoConfiguredLoadBalancerFactory.java | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java index 466999b37bc..e0d7e445fca 100644 --- a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java @@ -41,7 +41,6 @@ import io.grpc.Status; import io.grpc.internal.ServiceConfigUtil.LbConfig; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -333,26 +332,6 @@ public String toString() { } } - @VisibleForTesting - static final class ResolvedPolicySelection { - final PolicySelection policySelection; - final List serverList; - - ResolvedPolicySelection( - PolicySelection policySelection, List serverList) { - this.policySelection = checkNotNull(policySelection, "policySelection"); - this.serverList = Collections.unmodifiableList(checkNotNull(serverList, "serverList")); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("policySelection", policySelection) - .add("serverList", serverList) - .toString(); - } - } - private static final class EmptyPicker extends SubchannelPicker { @Override From 08cb3d8c56becf15c8d77032ce8206444a99557e Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Wed, 29 Jan 2020 16:56:04 -0800 Subject: [PATCH 7/7] Add annotation for attribute keys being depracated. --- .../io/grpc/alts/internal/AltsProtocolNegotiator.java | 1 + core/src/main/java/io/grpc/internal/GrpcAttributes.java | 9 +++++++++ .../io/grpc/internal/JndiResourceResolverFactory.java | 1 + .../test/java/io/grpc/internal/DnsNameResolverTest.java | 1 + .../java/io/grpc/internal/JndiResourceResolverTest.java | 2 +- grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java | 2 ++ .../src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java | 2 +- grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java | 2 +- .../test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java | 2 +- xds/src/main/java/io/grpc/xds/FallbackLb.java | 3 +++ 10 files changed, 21 insertions(+), 4 deletions(-) diff --git a/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java b/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java index 1e754f7e4f2..2a77e15bbb7 100644 --- a/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java +++ b/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java @@ -223,6 +223,7 @@ public AsciiString scheme() { return SCHEME; } + @SuppressWarnings("deprecation") @Override public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler); diff --git a/core/src/main/java/io/grpc/internal/GrpcAttributes.java b/core/src/main/java/io/grpc/internal/GrpcAttributes.java index 2ff4e7cff99..b7210a16778 100644 --- a/core/src/main/java/io/grpc/internal/GrpcAttributes.java +++ b/core/src/main/java/io/grpc/internal/GrpcAttributes.java @@ -38,6 +38,12 @@ public final class GrpcAttributes { public static final Attributes.Key> NAME_RESOLVER_SERVICE_CONFIG = Attributes.Key.create("service-config"); + /** + * Attribute key for gRPC LB server addresses. + * + *

Deprecated: this will be used for grpclb specific logic, which will be moved out of core. + */ + @Deprecated @NameResolver.ResolutionResultAttr public static final Attributes.Key> ATTR_LB_ADDRS = Attributes.Key.create("io.grpc.grpclb.lbAddrs"); @@ -45,7 +51,10 @@ public final class GrpcAttributes { /** * The naming authority of a gRPC LB server address. It is an address-group-level attribute, * present when the address group is a LoadBalancer. + * + *

Deprecated: this will be used for grpclb specific logic, which will be moved out of core. */ + @Deprecated @EquivalentAddressGroup.Attr public static final Attributes.Key ATTR_LB_ADDR_AUTHORITY = Attributes.Key.create("io.grpc.grpclb.lbAddrAuthority"); diff --git a/core/src/main/java/io/grpc/internal/JndiResourceResolverFactory.java b/core/src/main/java/io/grpc/internal/JndiResourceResolverFactory.java index 518393b43be..346ac1534c1 100644 --- a/core/src/main/java/io/grpc/internal/JndiResourceResolverFactory.java +++ b/core/src/main/java/io/grpc/internal/JndiResourceResolverFactory.java @@ -129,6 +129,7 @@ public List resolveTxt(String serviceConfigHostname) throws NamingExcept return Collections.unmodifiableList(serviceConfigTxtRecords); } + @SuppressWarnings("deprecation") @Override public List resolveSrv( AddressResolver addressResolver, String grpclbHostname) throws Exception { diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java index 0d3b26ed180..26cbc9dd7f1 100644 --- a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java +++ b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java @@ -561,6 +561,7 @@ public List resolveSrv(AddressResolver addressResolver, assertThat(ac.getValue().getServiceConfig()).isNull(); } + @SuppressWarnings("deprecation") @Test public void resolve_balancerAddrsAsAttributes() throws Exception { InetAddress backendAddr = InetAddress.getByAddress(new byte[] {127, 0, 0, 0}); diff --git a/core/src/test/java/io/grpc/internal/JndiResourceResolverTest.java b/core/src/test/java/io/grpc/internal/JndiResourceResolverTest.java index c2e9111a50c..965ef8f51cf 100644 --- a/core/src/test/java/io/grpc/internal/JndiResourceResolverTest.java +++ b/core/src/test/java/io/grpc/internal/JndiResourceResolverTest.java @@ -24,7 +24,6 @@ import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; import io.grpc.internal.DnsNameResolver.AddressResolver; -import io.grpc.internal.GrpcAttributes; import io.grpc.internal.JndiResourceResolverFactory.JndiRecordFetcher; import io.grpc.internal.JndiResourceResolverFactory.JndiResourceResolver; import io.grpc.internal.JndiResourceResolverFactory.RecordFetcher; @@ -81,6 +80,7 @@ public void txtRecordLookup() throws Exception { assertThat(resolver.resolveTxt("service.example.com")).isEqualTo(golden); } + @SuppressWarnings("deprecation") @Test public void srvRecordLookup() throws Exception { AddressResolver addressResolver = mock(AddressResolver.class); diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java index 0c02e93884f..db5e84f08c6 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java @@ -42,10 +42,12 @@ public final class GrpclbConstants { static final Attributes.Key TOKEN_ATTRIBUTE_KEY = Attributes.Key.create("lb-token"); + @SuppressWarnings("deprecation") @EquivalentAddressGroup.Attr static final Attributes.Key> ATTR_LB_ADDRS = io.grpc.internal.GrpcAttributes.ATTR_LB_ADDRS; + @SuppressWarnings("deprecation") @EquivalentAddressGroup.Attr static final Attributes.Key ATTR_LB_ADDR_AUTHORITY = io.grpc.internal.GrpcAttributes.ATTR_LB_ADDR_AUTHORITY; diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index df9282e32bc..6174fb5627d 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -103,7 +103,7 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { if (newLbAddresses != null) { for (EquivalentAddressGroup lbAddr : newLbAddresses) { - String lbAddrAuthority = lbAddr.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY); + String lbAddrAuthority = lbAddr.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY); if (lbAddrAuthority == null) { throw new AssertionError( "This is a bug: LB address " + lbAddr + " does not have an authority."); diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 925eb2f04e7..132f5fde5ce 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -793,7 +793,7 @@ private LbAddressGroup flattenLbAddressGroups(List groupList) { // actually used in the normal case. https://github.com/grpc/grpc-java/issues/4618 should allow // this to be more obvious. Attributes attrs = Attributes.newBuilder() - .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority) + .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority) .build(); return new LbAddressGroup(flattenEquivalentAddressGroup(eags, attrs), authority); } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 4e0f1063937..db424a4a8e3 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -2401,7 +2401,7 @@ private static String lbAuthority(int unused) { private static Attributes lbAttributes(String authority) { return Attributes.newBuilder() - .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority) + .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority) .build(); } diff --git a/xds/src/main/java/io/grpc/xds/FallbackLb.java b/xds/src/main/java/io/grpc/xds/FallbackLb.java index 41486e56eb7..380a22dba19 100644 --- a/xds/src/main/java/io/grpc/xds/FallbackLb.java +++ b/xds/src/main/java/io/grpc/xds/FallbackLb.java @@ -61,6 +61,7 @@ protected LoadBalancer delegate() { return fallbackPolicyLb; } + @SuppressWarnings("deprecation") @Override public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { Attributes attributes = resolvedAddresses.getAttributes(); @@ -113,6 +114,8 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { List servers = resolvedAddresses.getAddresses(); // Some addresses in the list may be grpclb-v1 balancer addresses, so if the fallback policy // does not support grpclb-v1 balancer addresses, then we need to exclude them from the list. + // TODO(chengyuanzhang): delete the following logic after changing internal resolver + // to not include grpclb server addresses. if (!newFallbackPolicyName.equals("grpclb") && !newFallbackPolicyName.equals(XDS_POLICY_NAME)) { ImmutableList.Builder backends = ImmutableList.builder(); for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {