diff --git a/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java b/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java index 1e754f7e4f2..2a77e15bbb7 100644 --- a/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java +++ b/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java @@ -223,6 +223,7 @@ public AsciiString scheme() { return SCHEME; } + @SuppressWarnings("deprecation") @Override public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler); diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java index d0a71b3ded2..bb12bfb3c6b 100644 --- a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java @@ -41,18 +41,13 @@ import io.grpc.Status; import io.grpc.internal.ServiceConfigUtil.LbConfig; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.logging.Logger; import javax.annotation.Nullable; // TODO(creamsoup) fully deprecate LoadBalancer.ATTR_LOAD_BALANCING_CONFIG @SuppressWarnings("deprecation") public final class AutoConfiguredLoadBalancerFactory { - private static final Logger logger = - Logger.getLogger(AutoConfiguredLoadBalancerFactory.class.getName()); - private static final String GRPCLB_POLICY_NAME = "grpclb"; private final LoadBalancerRegistry registry; private final String defaultPolicy; @@ -92,7 +87,6 @@ public final class AutoConfiguredLoadBalancer { private final Helper helper; private LoadBalancer delegate; private LoadBalancerProvider delegateProvider; - private boolean roundRobinDueToGrpclbDepMissing; AutoConfiguredLoadBalancer(Helper helper) { this.helper = helper; @@ -125,48 +119,53 @@ Status tryHandleResolvedAddresses(ResolvedAddresses resolvedAddresses) { } PolicySelection policySelection = (PolicySelection) resolvedAddresses.getLoadBalancingPolicyConfig(); - ResolvedPolicySelection resolvedSelection; - try { - resolvedSelection = resolveLoadBalancerProvider(servers, policySelection); - } catch (PolicyException e) { - Status s = Status.INTERNAL.withDescription(e.getMessage()); - helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s)); - delegate.shutdown(); - delegateProvider = null; - delegate = new NoopLoadBalancer(); - return Status.OK; + if (policySelection == null) { + LoadBalancerProvider defaultProvider; + try { + defaultProvider = getProviderOrThrow(defaultPolicy, "using default policy"); + } catch (PolicyException e) { + Status s = Status.INTERNAL.withDescription(e.getMessage()); + helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s)); + delegate.shutdown(); + delegateProvider = null; + delegate = new NoopLoadBalancer(); + return Status.OK; + } + policySelection = + new PolicySelection(defaultProvider, /* rawConfig= */ null, /* config= */ null); } - PolicySelection selection = resolvedSelection.policySelection; if (delegateProvider == null - || !selection.provider.getPolicyName().equals(delegateProvider.getPolicyName())) { + || !policySelection.provider.getPolicyName().equals(delegateProvider.getPolicyName())) { helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker()); delegate.shutdown(); - delegateProvider = selection.provider; + delegateProvider = policySelection.provider; LoadBalancer old = delegate; delegate = delegateProvider.newLoadBalancer(helper); helper.getChannelLogger().log( ChannelLogLevel.INFO, "Load balancer changed from {0} to {1}", old.getClass().getSimpleName(), delegate.getClass().getSimpleName()); } - Object lbConfig = selection.config; + Object lbConfig = policySelection.config; if (lbConfig != null) { helper.getChannelLogger().log( - ChannelLogLevel.DEBUG, "Load-balancing config: {0}", selection.config); + ChannelLogLevel.DEBUG, "Load-balancing config: {0}", policySelection.config); attributes = - attributes.toBuilder().set(ATTR_LOAD_BALANCING_CONFIG, selection.rawConfig).build(); + attributes.toBuilder() + .set(ATTR_LOAD_BALANCING_CONFIG, policySelection.rawConfig) + .build(); } LoadBalancer delegate = getDelegate(); - if (resolvedSelection.serverList.isEmpty() + if (resolvedAddresses.getAddresses().isEmpty() && !delegate.canHandleEmptyAddressListFromNameResolution()) { return Status.UNAVAILABLE.withDescription( "NameResolver returned no usable address. addrs=" + servers + ", attrs=" + attributes); } else { delegate.handleResolvedAddresses( ResolvedAddresses.newBuilder() - .setAddresses(resolvedSelection.serverList) + .setAddresses(resolvedAddresses.getAddresses()) .setAttributes(attributes) .setLoadBalancingPolicyConfig(lbConfig) .build()); @@ -206,78 +205,6 @@ void setDelegate(LoadBalancer lb) { LoadBalancerProvider getDelegateProvider() { return delegateProvider; } - - /** - * Resolves a load balancer based on given criteria. If policySelection is {@code null} and - * given servers contains any gRPC LB addresses, it will fall back to "grpclb". If no gRPC LB - * addresses are not present, it will fall back to {@link #defaultPolicy}. - * - * @param servers The list of servers reported - * @param policySelection the selected policy from raw service config - * @return the resolved policy selection - */ - @VisibleForTesting - ResolvedPolicySelection resolveLoadBalancerProvider( - List servers, @Nullable PolicySelection policySelection) - throws PolicyException { - // Check for balancer addresses - boolean haveBalancerAddress = false; - List backendAddrs = new ArrayList<>(); - for (EquivalentAddressGroup s : servers) { - if (s.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) != null) { - haveBalancerAddress = true; - } else { - backendAddrs.add(s); - } - } - - if (policySelection != null) { - String policyName = policySelection.provider.getPolicyName(); - return new ResolvedPolicySelection( - policySelection, policyName.equals(GRPCLB_POLICY_NAME) ? servers : backendAddrs); - } - - if (haveBalancerAddress) { - // This is a special case where the existence of balancer address in the resolved address - // selects "grpclb" policy if the service config couldn't select a policy - LoadBalancerProvider grpclbProvider = registry.getProvider(GRPCLB_POLICY_NAME); - if (grpclbProvider == null) { - if (backendAddrs.isEmpty()) { - throw new PolicyException( - "Received ONLY balancer addresses but grpclb runtime is missing"); - } - if (!roundRobinDueToGrpclbDepMissing) { - // We don't log the warning every time we have an update. - roundRobinDueToGrpclbDepMissing = true; - String errorMsg = "Found balancer addresses but grpclb runtime is missing." - + " Will use round_robin. Please include grpc-grpclb in your runtime dependencies."; - helper.getChannelLogger().log(ChannelLogLevel.ERROR, errorMsg); - logger.warning(errorMsg); - } - return new ResolvedPolicySelection( - new PolicySelection( - getProviderOrThrow( - "round_robin", "received balancer addresses but grpclb runtime is missing"), - /* rawConfig = */ null, - /* config= */ null), - backendAddrs); - } - return new ResolvedPolicySelection( - new PolicySelection( - grpclbProvider, /* rawConfig= */ null, /* config= */ null), servers); - } - // No balancer address this time. If balancer address shows up later, we want to make sure - // the warning is logged one more time. - roundRobinDueToGrpclbDepMissing = false; - - // No config nor balancer address. Use default. - return new ResolvedPolicySelection( - new PolicySelection( - getProviderOrThrow(defaultPolicy, "using default policy"), - /* rawConfig= */ null, - /* config= */ null), - servers); - } } private LoadBalancerProvider getProviderOrThrow(String policy, String choiceReason) @@ -406,26 +333,6 @@ public String toString() { } } - @VisibleForTesting - static final class ResolvedPolicySelection { - final PolicySelection policySelection; - final List serverList; - - ResolvedPolicySelection( - PolicySelection policySelection, List serverList) { - this.policySelection = checkNotNull(policySelection, "policySelection"); - this.serverList = Collections.unmodifiableList(checkNotNull(serverList, "serverList")); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("policySelection", policySelection) - .add("serverList", serverList) - .toString(); - } - } - private static final class EmptyPicker extends SubchannelPicker { @Override diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java index f81c577820f..9a6224d4ac7 100644 --- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java +++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java @@ -298,14 +298,12 @@ public void run() { for (InetAddress inetAddr : resolutionResults.addresses) { servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, port))); } - servers.addAll(resolutionResults.balancerAddresses); - if (servers.isEmpty()) { - savedListener.onError(Status.UNAVAILABLE.withDescription( - "No DNS backend or balancer addresses found for " + host)); - return; - } ResolutionResult.Builder resultBuilder = ResolutionResult.newBuilder().setAddresses(servers); + Attributes.Builder attributesBuilder = Attributes.newBuilder(); + if (!resolutionResults.balancerAddresses.isEmpty()) { + attributesBuilder.set(GrpcAttributes.ATTR_LB_ADDRS, resolutionResults.balancerAddresses); + } if (!resolutionResults.txtRecords.isEmpty()) { ConfigOrError rawServiceConfig = parseServiceConfig(resolutionResults.txtRecords, random, getLocalHostname()); @@ -319,17 +317,14 @@ public void run() { Map verifiedRawServiceConfig = (Map) rawServiceConfig.getConfig(); ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(verifiedRawServiceConfig); - resultBuilder - .setAttributes( - Attributes.newBuilder() - .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, verifiedRawServiceConfig) - .build()) - .setServiceConfig(parsedServiceConfig); + resultBuilder.setServiceConfig(parsedServiceConfig); + attributesBuilder + .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, verifiedRawServiceConfig); } } else { logger.log(Level.FINE, "No TXT records found for {0}", new Object[]{host}); } - savedListener.onResult(resultBuilder.build()); + savedListener.onResult(resultBuilder.setAttributes(attributesBuilder.build()).build()); } } diff --git a/core/src/main/java/io/grpc/internal/GrpcAttributes.java b/core/src/main/java/io/grpc/internal/GrpcAttributes.java index b8138d7f26f..b7210a16778 100644 --- a/core/src/main/java/io/grpc/internal/GrpcAttributes.java +++ b/core/src/main/java/io/grpc/internal/GrpcAttributes.java @@ -21,6 +21,7 @@ import io.grpc.Grpc; import io.grpc.NameResolver; import io.grpc.SecurityLevel; +import java.util.List; import java.util.Map; /** @@ -37,10 +38,23 @@ public final class GrpcAttributes { public static final Attributes.Key> NAME_RESOLVER_SERVICE_CONFIG = Attributes.Key.create("service-config"); + /** + * Attribute key for gRPC LB server addresses. + * + *

Deprecated: this will be used for grpclb specific logic, which will be moved out of core. + */ + @Deprecated + @NameResolver.ResolutionResultAttr + public static final Attributes.Key> ATTR_LB_ADDRS = + Attributes.Key.create("io.grpc.grpclb.lbAddrs"); + /** * The naming authority of a gRPC LB server address. It is an address-group-level attribute, * present when the address group is a LoadBalancer. + * + *

Deprecated: this will be used for grpclb specific logic, which will be moved out of core. */ + @Deprecated @EquivalentAddressGroup.Attr public static final Attributes.Key ATTR_LB_ADDR_AUTHORITY = Attributes.Key.create("io.grpc.grpclb.lbAddrAuthority"); diff --git a/core/src/main/java/io/grpc/internal/JndiResourceResolverFactory.java b/core/src/main/java/io/grpc/internal/JndiResourceResolverFactory.java index 518393b43be..346ac1534c1 100644 --- a/core/src/main/java/io/grpc/internal/JndiResourceResolverFactory.java +++ b/core/src/main/java/io/grpc/internal/JndiResourceResolverFactory.java @@ -129,6 +129,7 @@ public List resolveTxt(String serviceConfigHostname) throws NamingExcept return Collections.unmodifiableList(serviceConfigTxtRecords); } + @SuppressWarnings("deprecation") @Override public List resolveSrv( AddressResolver addressResolver, String grpclbHostname) throws Exception { diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java index c0918151790..fda33cdf07e 100644 --- a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java +++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java @@ -19,13 +19,10 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; -import static org.mockito.ArgumentMatchers.startsWith; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -50,23 +47,18 @@ import io.grpc.LoadBalancer.SubchannelStateListener; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; -import io.grpc.ManagedChannel; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; -import io.grpc.SynchronizationContext; import io.grpc.grpclb.GrpclbLoadBalancerProvider; import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer; -import io.grpc.internal.AutoConfiguredLoadBalancerFactory.PolicyException; import io.grpc.internal.AutoConfiguredLoadBalancerFactory.PolicySelection; -import io.grpc.internal.AutoConfiguredLoadBalancerFactory.ResolvedPolicySelection; import io.grpc.util.ForwardingLoadBalancerHelper; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -308,37 +300,42 @@ public void handleResolvedAddressGroups_propagateLbConfigToDelegate() throws Exc } @Test - public void handleResolvedAddressGroups_propagateOnlyBackendAddrsToDelegate() throws Exception { - // This case only happens when grpclb is missing. We will use a local registry - LoadBalancerRegistry registry = new LoadBalancerRegistry(); - registry.register(new PickFirstLoadBalancerProvider()); - registry.register( - new FakeLoadBalancerProvider( - "round_robin", testLbBalancer, /* nextParsedLbPolicyConfig= */ null)); + public void handleResolvedAddressGroups_propagateAddrsToDelegate() throws Exception { + Map rawServiceConfig = + parseConfig("{\"loadBalancingConfig\": [ {\"test_lb\": { \"setting1\": \"high\" } } ] }"); + ConfigOrError lbConfigs = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); + assertThat(lbConfigs.getConfig()).isNotNull(); - final List servers = - Arrays.asList( - new EquivalentAddressGroup(new SocketAddress(){}), - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); Helper helper = new TestHelper(); - AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory( - registry, GrpcUtil.DEFAULT_LB_POLICY).newLoadBalancer(helper); + AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(helper); + List servers = + Collections.singletonList(new EquivalentAddressGroup(new InetSocketAddress(8080){})); Status handleResult = lb.tryHandleResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers) - .setAttributes(Attributes.EMPTY) + .setLoadBalancingPolicyConfig(lbConfigs.getConfig()) .build()); + verify(testLbBalancerProvider).newLoadBalancer(same(helper)); assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); assertThat(lb.getDelegate()).isSameInstanceAs(testLbBalancer); - verify(testLbBalancer).handleResolvedAddresses( + ArgumentCaptor resultCaptor = + ArgumentCaptor.forClass(ResolvedAddresses.class); + verify(testLbBalancer).handleResolvedAddresses(resultCaptor.capture()); + assertThat(resultCaptor.getValue().getAddresses()).containsExactlyElementsIn(servers).inOrder(); + + servers = + Collections.singletonList(new EquivalentAddressGroup(new InetSocketAddress(9090){})); + handleResult = lb.tryHandleResolvedAddresses( ResolvedAddresses.newBuilder() - .setAddresses(Collections.singletonList(servers.get(0))) - .setAttributes(Attributes.EMPTY) + .setAddresses(servers) + .setLoadBalancingPolicyConfig(lbConfigs.getConfig()) .build()); + + assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); + verify(testLbBalancer, times(2)).handleResolvedAddresses(resultCaptor.capture()); + assertThat(resultCaptor.getValue().getAddresses()).containsExactlyElementsIn(servers).inOrder(); } @Test @@ -392,267 +389,79 @@ public void handleResolvedAddressGroups_delegateAcceptsEmptyAddressList() } @Test - public void decideLoadBalancerProvider_noBalancerAddresses_noServiceConfig_pickFirst() - throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - PolicySelection policySelection = null; - List servers = - Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider) - .isInstanceOf(PickFirstLoadBalancerProvider.class); - assertThat(selection.serverList).isEqualTo(servers); - assertThat(selection.policySelection.config).isNull(); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_noBalancerAddresses_noServiceConfig_customDefault() - throws Exception { - AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory("test_lb") - .newLoadBalancer(new TestHelper()); - PolicySelection policySelection = null; - List servers = - Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider).isSameInstanceAs(testLbBalancerProvider); - assertThat(selection.serverList).isEqualTo(servers); - assertThat(selection.policySelection.config).isNull(); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_oneBalancer_noServiceConfig_grpclb() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - PolicySelection policySelection = null; - List servers = - Collections.singletonList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class); - assertThat(selection.serverList).isEqualTo(servers); - assertThat(selection.policySelection.config).isNull(); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_serviceConfigLbPolicy() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - Map rawServiceConfig = - parseConfig("{\"loadBalancingPolicy\": \"round_robin\"}"); - - ConfigOrError lbConfig = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); - assertThat(lbConfig.getConfig()).isNotNull(); - PolicySelection policySelection = (PolicySelection) lbConfig.getConfig(); - List servers = - Arrays.asList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()), - new EquivalentAddressGroup( - new SocketAddress(){})); - List backends = Arrays.asList(servers.get(1)); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider.getClass().getName()).isEqualTo( - "io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider"); - assertThat(selection.serverList).isEqualTo(backends); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_serviceConfigLbConfig() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); + public void handleResolvedAddressGroups_useSelectedLbPolicy() throws Exception { Map rawServiceConfig = parseConfig("{\"loadBalancingConfig\": [{\"round_robin\": {}}]}"); + ConfigOrError lbConfigs = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); + assertThat(lbConfigs.getConfig()).isNotNull(); + assertThat(((PolicySelection) lbConfigs.getConfig()).provider.getClass().getName()) + .isEqualTo("io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider"); - ConfigOrError lbConfig = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); - assertThat(lbConfig.getConfig()).isNotNull(); - PolicySelection policySelection = (PolicySelection) lbConfig.getConfig(); - List servers = - Arrays.asList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()), - new EquivalentAddressGroup( - new SocketAddress(){})); - List backends = Arrays.asList(servers.get(1)); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider.getClass().getName()).isEqualTo( - "io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider"); - assertThat(selection.serverList).isEqualTo(backends); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_grpclbConfigPropagated() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - Map rawServiceConfig = - parseConfig( - "{\"loadBalancingConfig\": [" - + "{\"grpclb\": {\"childPolicy\": [ {\"pick_first\": {} } ] } }" - + "] }"); - ConfigOrError lbConfig = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); - assertThat(lbConfig.getConfig()).isNotNull(); - PolicySelection policySelection = (PolicySelection) lbConfig.getConfig(); - - List servers = - Collections.singletonList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class); - assertThat(selection.serverList).isEqualTo(servers); - assertThat(selection.policySelection.config) - .isEqualTo(((PolicySelection) lbConfig.getConfig()).config); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_policyUnavailButGrpclbAddressPresent() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - - List servers = - Collections.singletonList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, null); - - assertThat(selection.policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class); - assertThat(selection.serverList).isEqualTo(servers); - assertThat(selection.policySelection.config).isNull(); - verifyZeroInteractions(channelLogger); - } - - @Test - public void decideLoadBalancerProvider_grpclbProviderNotFound_fallbackToRoundRobin() - throws Exception { - LoadBalancerRegistry registry = new LoadBalancerRegistry(); - registry.register(new PickFirstLoadBalancerProvider()); - LoadBalancerProvider fakeRoundRobinProvider = - new FakeLoadBalancerProvider("round_robin", testLbBalancer, null); - registry.register(fakeRoundRobinProvider); - AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory( - registry, GrpcUtil.DEFAULT_LB_POLICY).newLoadBalancer(new TestHelper()); - List servers = - Arrays.asList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()), - new EquivalentAddressGroup(new SocketAddress(){})); - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, null); - - assertThat(selection.policySelection.provider).isSameInstanceAs(fakeRoundRobinProvider); - assertThat(selection.policySelection.config).isNull(); - verify(channelLogger).log( - eq(ChannelLogLevel.ERROR), - startsWith("Found balancer addresses but grpclb runtime is missing")); - - // Called for the second time, the warning is only logged once - selection = lb.resolveLoadBalancerProvider(servers, null); - - assertThat(selection.policySelection.provider).isSameInstanceAs(fakeRoundRobinProvider); - assertThat(selection.policySelection.config).isNull(); - // Balancer addresses are filtered out in the server list passed to round_robin - assertThat(selection.serverList).containsExactly(servers.get(1)); - verifyNoMoreInteractions(channelLogger);; + final List servers = + Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); + Helper helper = new TestHelper() { + @Override + public Subchannel createSubchannel(CreateSubchannelArgs args) { + assertThat(args.getAddresses()).isEqualTo(servers); + return new TestSubchannel(args); + } + }; + AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(helper); + Status handleResult = lb.tryHandleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers) + .setLoadBalancingPolicyConfig(lbConfigs.getConfig()) + .build()); + assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); + assertThat(lb.getDelegate().getClass().getName()) + .isEqualTo("io.grpc.util.RoundRobinLoadBalancer"); } @Test - public void decideLoadBalancerProvider_grpclbProviderNotFound_noBackendAddress() - throws Exception { - LoadBalancerRegistry registry = new LoadBalancerRegistry(); - registry.register(new PickFirstLoadBalancerProvider()); - registry.register(new FakeLoadBalancerProvider("round_robin", testLbBalancer, null)); - AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory( - registry, GrpcUtil.DEFAULT_LB_POLICY).newLoadBalancer(new TestHelper()); - List servers = - Collections.singletonList( - new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); - try { - lb.resolveLoadBalancerProvider(servers, null); - fail("Should throw"); - } catch (PolicyException e) { - assertThat(e) - .hasMessageThat() - .isEqualTo("Received ONLY balancer addresses but grpclb runtime is missing"); - } + public void handleResolvedAddressGroups_noLbPolicySelected_defaultToPickFirst() { + final List servers = + Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); + Helper helper = new TestHelper() { + @Override + public Subchannel createSubchannel(CreateSubchannelArgs args) { + assertThat(args.getAddresses()).isEqualTo(servers); + return new TestSubchannel(args); + } + }; + AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(helper); + Status handleResult = lb.tryHandleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers) + .setLoadBalancingPolicyConfig(null) + .build()); + assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); + assertThat(lb.getDelegate()).isInstanceOf(PickFirstLoadBalancer.class); } @Test - public void decideLoadBalancerProvider_serviceConfigLbConfigOverridesDefault() throws Exception { - AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); - Map rawServiceConfig = - parseConfig("{\"loadBalancingConfig\": [ {\"round_robin\": {} } ] }"); - ConfigOrError lbConfigs = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); - assertThat(lbConfigs.getConfig()).isNotNull(); - PolicySelection policySelection = (PolicySelection) lbConfigs.getConfig(); + public void handleResolvedAddressGroups_noLbPolicySelected_defaultToCustomDefault() { + AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory("test_lb") + .newLoadBalancer(new TestHelper()); List servers = Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); - - ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection); - - assertThat(selection.policySelection.provider.getClass().getName()).isEqualTo( - "io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider"); - verifyZeroInteractions(channelLogger); + Status handleResult = lb.tryHandleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers) + .setLoadBalancingPolicyConfig(null) + .build()); + assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); + assertThat(lb.getDelegate()).isSameInstanceAs(testLbBalancer); } @Test public void channelTracing_lbPolicyChanged() throws Exception { - final FakeClock clock = new FakeClock(); List servers = Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){})); Helper helper = new TestHelper() { - @Override - @Deprecated - public Subchannel createSubchannel(List addrs, Attributes attrs) { - return new TestSubchannel(CreateSubchannelArgs.newBuilder() - .setAddresses(addrs) - .setAttributes(attrs) - .build()); - } - @Override public Subchannel createSubchannel(CreateSubchannelArgs args) { return new TestSubchannel(args); } - - @Override - public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { - return mock(ManagedChannel.class, RETURNS_DEEP_STUBS); - } - - @Override - public String getAuthority() { - return "fake_authority"; - } - - @Override - public SynchronizationContext getSynchronizationContext() { - return new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - throw new AssertionError(e); - } - }); - } - - @Override - public ScheduledExecutorService getScheduledExecutorService() { - return clock.getScheduledExecutorService(); - } }; AutoConfiguredLoadBalancer lb = @@ -705,23 +514,6 @@ public ScheduledExecutorService getScheduledExecutorService() { eq("Load-balancing config: {0}"), eq(testLbParsedConfig.getConfig())); verifyNoMoreInteractions(channelLogger); - - servers = Collections.singletonList(new EquivalentAddressGroup( - new SocketAddress(){}, - Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); - handleResult = lb.tryHandleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(servers) - .setAttributes(Attributes.EMPTY) - .build()); - - assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK); - verify(channelLogger).log( - eq(ChannelLogLevel.INFO), - eq("Load balancer changed from {0} to {1}"), - eq(testLbBalancer.getClass().getSimpleName()), eq("GrpclbLoadBalancer")); - - verifyNoMoreInteractions(channelLogger); } @Test @@ -834,6 +626,21 @@ public void parseLoadBalancerConfig_someProvidesAreNotAvailable() throws Excepti eq(new ArrayList<>(Collections.singletonList("magic_balancer")))); } + @Test + public void parseLoadBalancerConfig_lbConfigPropagated() throws Exception { + Map rawServiceConfig = + parseConfig( + "{\"loadBalancingConfig\": [" + + "{\"grpclb\": {\"childPolicy\": [ {\"pick_first\": {} } ] } }" + + "] }"); + ConfigOrError parsed = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger); + assertThat(parsed).isNotNull(); + assertThat(parsed.getConfig()).isNotNull(); + PolicySelection policySelection = (PolicySelection) parsed.getConfig(); + assertThat(policySelection.config).isNotNull(); + assertThat(policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class); + verifyZeroInteractions(channelLogger); + } public static class ForwardingLoadBalancer extends LoadBalancer { private final LoadBalancer delegate; diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java index 192d03d3341..26cbc9dd7f1 100644 --- a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java +++ b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java @@ -36,6 +36,7 @@ import com.google.common.collect.Iterables; import com.google.common.net.InetAddresses; import com.google.common.testing.FakeTicker; +import io.grpc.Attributes; import io.grpc.ChannelLogger; import io.grpc.EquivalentAddressGroup; import io.grpc.HttpConnectProxiedSocketAddress; @@ -46,7 +47,6 @@ import io.grpc.ProxyDetector; import io.grpc.StaticTestingClassLoader; import io.grpc.Status; -import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.internal.DnsNameResolver.AddressResolver; import io.grpc.internal.DnsNameResolver.ResolutionResults; @@ -154,7 +154,8 @@ private DnsNameResolver newResolver(String name, int defaultPort) { private DnsNameResolver newResolver(String name, int defaultPort, boolean isAndroid) { return newResolver( - name, defaultPort, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(), isAndroid); + name, defaultPort, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(), + isAndroid, false); } private DnsNameResolver newResolver( @@ -162,7 +163,7 @@ private DnsNameResolver newResolver( int defaultPort, ProxyDetector proxyDetector, Stopwatch stopwatch) { - return newResolver(name, defaultPort, proxyDetector, stopwatch, false); + return newResolver(name, defaultPort, proxyDetector, stopwatch, false, false); } private DnsNameResolver newResolver( @@ -170,7 +171,8 @@ private DnsNameResolver newResolver( final int defaultPort, final ProxyDetector proxyDetector, Stopwatch stopwatch, - boolean isAndroid) { + boolean isAndroid, + boolean enableSrv) { NameResolver.Args args = NameResolver.Args.newBuilder() .setDefaultPort(defaultPort) @@ -179,19 +181,34 @@ private DnsNameResolver newResolver( .setServiceConfigParser(mock(ServiceConfigParser.class)) .setChannelLogger(mock(ChannelLogger.class)) .build(); - return newResolver(name, stopwatch, isAndroid, args); + return newResolver(name, stopwatch, isAndroid, args, enableSrv); } private DnsNameResolver newResolver( String name, Stopwatch stopwatch, boolean isAndroid, NameResolver.Args args) { + return newResolver(name, stopwatch, isAndroid, args, /* enableSrv= */ false); + } + + private DnsNameResolver newResolver( + String name, + Stopwatch stopwatch, + boolean isAndroid, + NameResolver.Args args, + boolean enableSrv) { DnsNameResolver dnsResolver = new DnsNameResolver( - null, name, args, fakeExecutorResource, stopwatch, isAndroid, /* enableSrv= */ false); + null, name, args, fakeExecutorResource, stopwatch, isAndroid, enableSrv); // By default, using the mocked ResourceResolver to avoid I/O dnsResolver.setResourceResolver(new JndiResourceResolver(recordFetcher)); return dnsResolver; } + private DnsNameResolver newSrvEnabledResolver(String name, int defaultPort) { + return newResolver( + name, defaultPort, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(), + false, true); + } + @Before public void setUp() { DnsNameResolver.enableJndi = true; @@ -363,26 +380,6 @@ public void execute(Runnable command) { assertThat(executions.get()).isEqualTo(1); } - @Test - public void resolveAll_failsOnEmptyResult() { - DnsNameResolver nr = newResolver("dns:///addr.fake:1234", 443); - nr.setAddressResolver(new AddressResolver() { - @Override - public List resolveAddress(String host) throws Exception { - return Collections.emptyList(); - } - }); - - nr.start(mockListener); - assertThat(fakeExecutor.runDueTasks()).isEqualTo(1); - - ArgumentCaptor ac = ArgumentCaptor.forClass(Status.class); - verify(mockListener).onError(ac.capture()); - verifyNoMoreInteractions(mockListener); - assertThat(ac.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(ac.getValue().getDescription()).contains("No DNS backend or balancer addresses"); - } - @Test public void resolve_cacheForever() throws Exception { System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, "-1"); @@ -531,6 +528,75 @@ private void resolveDefaultValue() throws Exception { verify(mockResolver, times(2)).resolveAddress(anyString()); } + @Test + public void resolve_emptyResult() { + DnsNameResolver nr = newResolver("dns:///addr.fake:1234", 443); + nr.setAddressResolver(new AddressResolver() { + @Override + public List resolveAddress(String host) throws Exception { + return Collections.emptyList(); + } + }); + nr.setResourceResolver(new ResourceResolver() { + @Override + public List resolveTxt(String host) throws Exception { + return Collections.emptyList(); + } + + @Override + public List resolveSrv(AddressResolver addressResolver, String host) + throws Exception { + return Collections.emptyList(); + } + }); + + nr.start(mockListener); + assertThat(fakeExecutor.runDueTasks()).isEqualTo(1); + + ArgumentCaptor ac = ArgumentCaptor.forClass(ResolutionResult.class); + verify(mockListener).onResult(ac.capture()); + verifyNoMoreInteractions(mockListener); + assertThat(ac.getValue().getAddresses()).isEmpty(); + assertThat(ac.getValue().getAttributes()).isEqualTo(Attributes.EMPTY); + assertThat(ac.getValue().getServiceConfig()).isNull(); + } + + @SuppressWarnings("deprecation") + @Test + public void resolve_balancerAddrsAsAttributes() throws Exception { + InetAddress backendAddr = InetAddress.getByAddress(new byte[] {127, 0, 0, 0}); + final EquivalentAddressGroup balancerAddr = + new EquivalentAddressGroup( + new SocketAddress() {}, + Attributes.newBuilder() + .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "foo.example.com") + .build()); + String name = "foo.googleapis.com"; + + AddressResolver mockAddressResolver = mock(AddressResolver.class); + when(mockAddressResolver.resolveAddress(anyString())) + .thenReturn(Collections.singletonList(backendAddr)); + ResourceResolver mockResourceResolver = mock(ResourceResolver.class); + when(mockResourceResolver.resolveTxt(anyString())).thenReturn(Collections.emptyList()); + when(mockResourceResolver.resolveSrv(ArgumentMatchers.any(AddressResolver.class), anyString())) + .thenReturn(Collections.singletonList(balancerAddr)); + + DnsNameResolver resolver = newSrvEnabledResolver(name, 81); + resolver.setAddressResolver(mockAddressResolver); + resolver.setResourceResolver(mockResourceResolver); + + resolver.start(mockListener); + assertEquals(1, fakeExecutor.runDueTasks()); + verify(mockListener).onResult(resultCaptor.capture()); + ResolutionResult result = resultCaptor.getValue(); + InetSocketAddress resolvedBackendAddr = + (InetSocketAddress) Iterables.getOnlyElement( + Iterables.getOnlyElement(result.getAddresses()).getAddresses()); + assertThat(resolvedBackendAddr.getAddress()).isEqualTo(backendAddr); + assertThat(result.getAttributes().get(GrpcAttributes.ATTR_LB_ADDRS)) + .containsExactly(balancerAddr); + } + @Test public void resolveAll_nullResourceResolver() throws Exception { final String hostname = "addr.fake"; diff --git a/core/src/test/java/io/grpc/internal/JndiResourceResolverTest.java b/core/src/test/java/io/grpc/internal/JndiResourceResolverTest.java index c2e9111a50c..965ef8f51cf 100644 --- a/core/src/test/java/io/grpc/internal/JndiResourceResolverTest.java +++ b/core/src/test/java/io/grpc/internal/JndiResourceResolverTest.java @@ -24,7 +24,6 @@ import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; import io.grpc.internal.DnsNameResolver.AddressResolver; -import io.grpc.internal.GrpcAttributes; import io.grpc.internal.JndiResourceResolverFactory.JndiRecordFetcher; import io.grpc.internal.JndiResourceResolverFactory.JndiResourceResolver; import io.grpc.internal.JndiResourceResolverFactory.RecordFetcher; @@ -81,6 +80,7 @@ public void txtRecordLookup() throws Exception { assertThat(resolver.resolveTxt("service.example.com")).isEqualTo(golden); } + @SuppressWarnings("deprecation") @Test public void srvRecordLookup() throws Exception { AddressResolver addressResolver = mock(AddressResolver.class); diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java index 65f4832f540..db5e84f08c6 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java @@ -20,6 +20,7 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.ExperimentalApi; import io.grpc.Metadata; +import java.util.List; /** * Constants for the GRPCLB load-balancer. @@ -41,5 +42,15 @@ public final class GrpclbConstants { static final Attributes.Key TOKEN_ATTRIBUTE_KEY = Attributes.Key.create("lb-token"); + @SuppressWarnings("deprecation") + @EquivalentAddressGroup.Attr + static final Attributes.Key> ATTR_LB_ADDRS = + io.grpc.internal.GrpcAttributes.ATTR_LB_ADDRS; + + @SuppressWarnings("deprecation") + @EquivalentAddressGroup.Attr + static final Attributes.Key ATTR_LB_ADDR_AUTHORITY = + io.grpc.internal.GrpcAttributes.ATTR_LB_ADDR_AUTHORITY; + private GrpclbConstants() { } } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index cd539bdb6ac..6174fb5627d 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -91,22 +91,30 @@ public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo n @Override @SuppressWarnings("deprecation") // TODO(creamsoup) migrate to use parsed service config public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { - List updatedServers = resolvedAddresses.getAddresses(); Attributes attributes = resolvedAddresses.getAttributes(); - // LB addresses and backend addresses are treated separately + List newLbAddresses = attributes.get(GrpcAttributes.ATTR_LB_ADDRS); + if ((newLbAddresses == null || newLbAddresses.isEmpty()) + && resolvedAddresses.getAddresses().isEmpty()) { + handleNameResolutionError( + Status.UNAVAILABLE.withDescription("No backend or balancer addresses found")); + return; + } List newLbAddressGroups = new ArrayList<>(); - List newBackendServers = new ArrayList<>(); - for (EquivalentAddressGroup server : updatedServers) { - String lbAddrAuthority = server.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY); - if (lbAddrAuthority != null) { - newLbAddressGroups.add(new LbAddressGroup(server, lbAddrAuthority)); - } else { - newBackendServers.add(server); + + if (newLbAddresses != null) { + for (EquivalentAddressGroup lbAddr : newLbAddresses) { + String lbAddrAuthority = lbAddr.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY); + if (lbAddrAuthority == null) { + throw new AssertionError( + "This is a bug: LB address " + lbAddr + " does not have an authority."); + } + newLbAddressGroups.add(new LbAddressGroup(lbAddr, lbAddrAuthority)); } } newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups); - newBackendServers = Collections.unmodifiableList(newBackendServers); + List newBackendServers = + Collections.unmodifiableList(resolvedAddresses.getAddresses()); Map rawLbConfigValue = attributes.get(ATTR_LOAD_BALANCING_CONFIG); Mode newMode = retrieveModeFromLbConfig(rawLbConfigValue, helper.getChannelLogger()); if (!mode.equals(newMode)) { @@ -184,6 +192,11 @@ public void handleNameResolutionError(Status error) { } } + @Override + public boolean canHandleEmptyAddressListFromNameResolution() { + return true; + } + @VisibleForTesting @Nullable GrpclbState getGrpclbState() { diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index a2679472f6f..4016bc56bc2 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -794,7 +794,7 @@ private LbAddressGroup flattenLbAddressGroups(List groupList) { // actually used in the normal case. https://github.com/grpc/grpc-java/issues/4618 should allow // this to be more obvious. Attributes attrs = Attributes.newBuilder() - .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority) + .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority) .build(); return new LbAddressGroup(flattenEquivalentAddressGroup(eags, attrs), authority); } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index a88b840d4d4..db424a4a8e3 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -468,9 +468,11 @@ public void loadReporting() { when(args.getHeaders()).thenReturn(headers); long loadReportIntervalMillis = 1983; - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); // Fallback timer is started as soon as address is resolved. assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); @@ -485,7 +487,7 @@ public void loadReporting() { inOrder.verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -593,7 +595,7 @@ public void loadReporting() { .setLoadBalanceToken("token0003") .setNumCalls(1) // pick4 .build()) - .build()); + .build()); PickResult pick5 = picker.pickSubchannel(args); assertSame(subchannel1, pick1.getSubchannel()); @@ -641,7 +643,7 @@ public void loadReporting() { inOrder.verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Load reporting is also requested @@ -693,9 +695,11 @@ public void abundantInitialResponse() { PickSubchannelArgs args = mock(PickSubchannelArgs.class); when(args.getHeaders()).thenReturn(headers); - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); @@ -731,9 +735,11 @@ public void raceBetweenLoadReportingAndLbStreamClosure() { PickSubchannelArgs args = mock(PickSubchannelArgs.class); when(args.getHeaders()).thenReturn(headers); - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); @@ -743,7 +749,7 @@ public void raceBetweenLoadReportingAndLbStreamClosure() { inOrder.verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -780,11 +786,25 @@ private void assertNextReport( eq(LoadBalanceRequest.newBuilder() .setClientStats( ClientStats.newBuilder(expectedReport) - .setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read())) - .build()) + .setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read())) + .build()) .build())); } + @Test + public void receiveNoBackendAndBalancerAddress() { + deliverResolvedAddresses( + Collections.emptyList(), + Collections.emptyList(), + Attributes.EMPTY); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList).isEmpty(); + Status error = Iterables.getOnlyElement(picker.pickList).picked(new Metadata()).getStatus(); + assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(error.getDescription()).isEqualTo("No backend or balancer addresses found"); + } + @Test public void nameResolutionFailsThenRecover() { Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); @@ -803,11 +823,12 @@ public void nameResolutionFailsThenRecover() { assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); // Recover with a subsequent success - List resolvedServers = createResolvedServerAddresses(true); - EquivalentAddressGroup eag = resolvedServers.get(0); + List grpclbBalancerList = createResolvedBalancerAddresses(1); + EquivalentAddressGroup eag = grpclbBalancerList.get(0); Attributes resolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(resolvedServers, resolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), grpclbBalancerList, resolutionAttrs); verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0))); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -817,11 +838,13 @@ public void nameResolutionFailsThenRecover() { public void grpclbThenNameResolutionFails() { InOrder inOrder = inOrder(helper, subchannelPool); // Go to GRPCLB first - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); - verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -854,59 +877,63 @@ public void grpclbThenNameResolutionFails() { @Test public void grpclbUpdatedAddresses_avoidsReconnect() { - List grpclbResolutionList = - createResolvedServerAddresses(true, false); + List backendList = createResolvedBackendAddresses(1); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs); - verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); ManagedChannel oobChannel = fakeOobChannels.poll(); assertEquals(1, lbRequestObservers.size()); - List grpclbResolutionList2 = - createResolvedServerAddresses(true, false, true); + List backendList2 = createResolvedBackendAddresses(1); + List grpclbBalancerList2 = createResolvedBalancerAddresses(2); EquivalentAddressGroup combinedEag = new EquivalentAddressGroup(Arrays.asList( - grpclbResolutionList2.get(0).getAddresses().get(0), - grpclbResolutionList2.get(2).getAddresses().get(0)), + grpclbBalancerList2.get(0).getAddresses().get(0), + grpclbBalancerList2.get(1).getAddresses().get(0)), lbAttributes(lbAuthority(0))); - deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList2, grpclbBalancerList2, grpclbResolutionAttrs); verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(combinedEag)); assertEquals(1, lbRequestObservers.size()); // No additional RPC } @Test public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() { - List grpclbResolutionList = - createResolvedServerAddresses(true, false); + List backendList = createResolvedBackendAddresses(1); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs); - verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); ManagedChannel oobChannel = fakeOobChannels.poll(); assertEquals(1, lbRequestObservers.size()); final String newAuthority = "some-new-authority"; - List grpclbResolutionList2 = - createResolvedServerAddresses(false); - grpclbResolutionList2.add(new EquivalentAddressGroup( - new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority))); - deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs); + List backendList2 = createResolvedBackendAddresses(1); + List grpclbBalancerList2 = + Collections.singletonList( + new EquivalentAddressGroup( + new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority))); + deliverResolvedAddresses( + backendList2, grpclbBalancerList2, grpclbResolutionAttrs); assertTrue(oobChannel.isTerminated()); - verify(helper).createOobChannel(eq(grpclbResolutionList2.get(1)), eq(newAuthority)); + verify(helper).createOobChannel(eq(grpclbBalancerList2.get(0)), eq(newAuthority)); assertEquals(2, lbRequestObservers.size()); // An additional RPC } @Test public void grpclbWorking() { InOrder inOrder = inOrder(helper, subchannelPool); - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); // Fallback timer is started as soon as the addresses are resolved. assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -915,7 +942,7 @@ public void grpclbWorking() { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -1174,13 +1201,14 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { long loadReportIntervalMillis = 1983; InOrder inOrder = inOrder(helper, subchannelPool); - // Create a resolution list with a mixture of balancer and backend addresses - List resolutionList = - createResolvedServerAddresses(false, true, false); + // Create balancer and backend addresses + List backendList = createResolvedBackendAddresses(2); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes resolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(resolutionList, resolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs); - inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); + inOrder.verify(helper) + .createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); // Attempted to connect to balancer assertEquals(1, fakeOobChannels.size()); @@ -1192,7 +1220,7 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // We don't care if these methods have been run. @@ -1214,11 +1242,11 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); List fallbackList = - Arrays.asList(resolutionList.get(0), resolutionList.get(2)); + Arrays.asList(backendList.get(0), backendList.get(1)); assertThat(logs).containsExactly( "INFO: Using fallback backends", "INFO: Using RR list=[[[FakeSocketAddress-fake-address-0]/{}], " - + "[[FakeSocketAddress-fake-address-2]/{}]], drop=[null, null]", + + "[[FakeSocketAddress-fake-address-1]/{}]], drop=[null, null]", "INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder(); // Fall back to the backends from resolver @@ -1228,20 +1256,21 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { verify(lbRequestObserver, never()).onCompleted(); } - //////////////////////////////////////////////////////// - // Name resolver sends new list without any backend addr - //////////////////////////////////////////////////////// - resolutionList = createResolvedServerAddresses(true, true); - deliverResolvedAddresses(resolutionList, resolutionAttrs); + ////////////////////////////////////////////////////////////////////// + // Name resolver sends new resolution results without any backend addr + ////////////////////////////////////////////////////////////////////// + grpclbBalancerList = createResolvedBalancerAddresses(2); + deliverResolvedAddresses( + Collections.emptyList(),grpclbBalancerList, resolutionAttrs); // New addresses are updated to the OobChannel inOrder.verify(helper).updateOobChannelAddresses( same(oobChannel), eq(new EquivalentAddressGroup( - Arrays.asList( - resolutionList.get(0).getAddresses().get(0), - resolutionList.get(1).getAddresses().get(0)), - lbAttributes(lbAuthority(0))))); + Arrays.asList( + grpclbBalancerList.get(0).getAddresses().get(0), + grpclbBalancerList.get(1).getAddresses().get(0)), + lbAttributes(lbAuthority(0))))); if (timerExpires) { // Still in fallback logic, except that the backend list is empty @@ -1249,21 +1278,22 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { inOrder, Collections.emptyList()); } - ////////////////////////////////////////////////// - // Name resolver sends new list with backend addrs - ////////////////////////////////////////////////// - resolutionList = createResolvedServerAddresses(true, false, false); - deliverResolvedAddresses(resolutionList, resolutionAttrs); + //////////////////////////////////////////////////////////////// + // Name resolver sends new resolution results with backend addrs + //////////////////////////////////////////////////////////////// + backendList = createResolvedBackendAddresses(2); + grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs); // New LB address is updated to the OobChannel inOrder.verify(helper).updateOobChannelAddresses( same(oobChannel), - eq(resolutionList.get(0))); + eq(grpclbBalancerList.get(0))); if (timerExpires) { // New backend addresses are used for fallback fallbackTestVerifyUseOfFallbackBackendLists( - inOrder, Arrays.asList(resolutionList.get(1), resolutionList.get(2))); + inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); } //////////////////////////////////////////////// @@ -1283,7 +1313,7 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); } @@ -1302,8 +1332,9 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { /////////////////////////////////////////////////////////////// // New backend addresses from resolver outside of fallback mode /////////////////////////////////////////////////////////////// - resolutionList = createResolvedServerAddresses(true, false); - deliverResolvedAddresses(resolutionList, resolutionAttrs); + backendList = createResolvedBackendAddresses(1); + grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs); // Will not affect the round robin list at all inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); @@ -1317,13 +1348,13 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { long loadReportIntervalMillis = 1983; InOrder inOrder = inOrder(helper, subchannelPool); - // Create a resolution list with a mixture of balancer and backend addresses - List resolutionList = - createResolvedServerAddresses(false, true, false); + // Create balancer and backend addresses + List backendList = createResolvedBackendAddresses(2); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes resolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(resolutionList, resolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs); - inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); + inOrder.verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); // Attempted to connect to balancer assertThat(fakeOobChannels).hasSize(1); @@ -1334,7 +1365,7 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // We don't care if these methods have been run. @@ -1353,7 +1384,7 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { // Fall back to the backends from resolver fallbackTestVerifyUseOfFallbackBackendLists( - inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); + inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); // A new stream is created verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); @@ -1361,7 +1392,7 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); } @@ -1369,19 +1400,20 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { public void grpclbFallback_noBalancerAddress() { InOrder inOrder = inOrder(helper, subchannelPool); - // Create a resolution list with just backend addresses - List resolutionList = createResolvedServerAddresses(false, false); + // Create just backend addresses + List backendList = createResolvedBackendAddresses(2); Attributes resolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(resolutionList, resolutionAttrs); + deliverResolvedAddresses( + backendList, Collections.emptyList(), resolutionAttrs); assertThat(logs).containsExactly( - "INFO: Using fallback backends", - "INFO: Using RR list=[[[FakeSocketAddress-fake-address-0]/{}], " - + "[[FakeSocketAddress-fake-address-1]/{}]], drop=[null, null]", - "INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder(); + "INFO: Using fallback backends", + "INFO: Using RR list=[[[FakeSocketAddress-fake-address-0]/{}], " + + "[[FakeSocketAddress-fake-address-1]/{}]], drop=[null, null]", + "INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder(); // Fall back to the backends from resolver - fallbackTestVerifyUseOfFallbackBackendLists(inOrder, resolutionList); + fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList); // No fallback timeout timer scheduled. assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); @@ -1410,13 +1442,13 @@ private void subtestGrpclbFallbackConnectionLost( long loadReportIntervalMillis = 1983; InOrder inOrder = inOrder(helper, mockLbService, subchannelPool); - // Create a resolution list with a mixture of balancer and backend addresses - List resolutionList = - createResolvedServerAddresses(false, true, false); + // Create balancer and backend addresses + List backendList = createResolvedBackendAddresses(2); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes resolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(resolutionList, resolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs); - inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); + inOrder.verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); // Attempted to connect to balancer assertEquals(1, fakeOobChannels.size()); @@ -1428,7 +1460,7 @@ private void subtestGrpclbFallbackConnectionLost( verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // We don't care if these methods have been run. @@ -1465,7 +1497,7 @@ private void subtestGrpclbFallbackConnectionLost( if (balancerBroken && allSubchannelsBroken) { // Going into fallback subchannels = fallbackTestVerifyUseOfFallbackBackendLists( - inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); + inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); // When in fallback mode, fallback timer should not be scheduled when all backend // connections are lost @@ -1486,9 +1518,9 @@ private void subtestGrpclbFallbackConnectionLost( if (!(balancerBroken && allSubchannelsBroken)) { verify(subchannelPool, never()).takeOrCreateSubchannel( - eq(resolutionList.get(0)), any(Attributes.class)); + eq(backendList.get(0)), any(Attributes.class)); verify(subchannelPool, never()).takeOrCreateSubchannel( - eq(resolutionList.get(2)), any(Attributes.class)); + eq(backendList.get(1)), any(Attributes.class)); } } @@ -1555,15 +1587,15 @@ private List fallbackTestVerifyUseOfBackendLists( @Test public void grpclbMultipleAuthorities() throws Exception { - List grpclbResolutionList = Arrays.asList( + List backendList = Collections.singletonList( + new EquivalentAddressGroup(new FakeSocketAddress("not-a-lb-address"))); + List grpclbBalancerList = Arrays.asList( new EquivalentAddressGroup( new FakeSocketAddress("fake-address-1"), lbAttributes("fake-authority-1")), new EquivalentAddressGroup( new FakeSocketAddress("fake-address-2"), lbAttributes("fake-authority-2")), - new EquivalentAddressGroup( - new FakeSocketAddress("not-a-lb-address")), new EquivalentAddressGroup( new FakeSocketAddress("fake-address-3"), lbAttributes("fake-authority-1"))); @@ -1574,7 +1606,7 @@ public void grpclbMultipleAuthorities() throws Exception { lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs); verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1"); } @@ -1588,9 +1620,11 @@ public void grpclbBalancerStreamClosedAndRetried() throws Exception { .build(); InOrder inOrder = inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2, helper); - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); @SuppressWarnings("unused") @@ -1693,11 +1727,13 @@ public void grpclbWorking_pickFirstMode() throws Exception { InOrder inOrder = inOrder(helper); String lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}"; - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.newBuilder().set( LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -1706,7 +1742,7 @@ public void grpclbWorking_pickFirstMode() throws Exception { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -1722,8 +1758,8 @@ public void grpclbWorking_pickFirstMode() throws Exception { // the new createSubchannel(). inOrder.verify(helper).createSubchannel( eq(Arrays.asList( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))), + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))), any(Attributes.class)); // Initially IDLE @@ -1779,9 +1815,9 @@ public void grpclbWorking_pickFirstMode() throws Exception { assertThat(mockSubchannels).isEmpty(); verify(subchannel).updateAddresses( eq(Arrays.asList( - new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends2.get(2).addr, - eagAttrsWithToken("token0004"))))); + new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends2.get(2).addr, + eagAttrsWithToken("token0004"))))); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker4.dropList).containsExactly( @@ -1825,10 +1861,12 @@ public void shutdownWithoutSubchannel_pickFirst() throws Exception { @SuppressWarnings("deprecation") // TODO(creamsoup) use parsed object private void subtestShutdownWithoutSubchannel(String childPolicy) throws Exception { String lbConfig = "{\"childPolicy\" : [ {\"" + childPolicy + "\" : {}} ]}"; - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.newBuilder().set( LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); assertEquals(1, lbRequestObservers.size()); StreamObserver requestObserver = lbRequestObservers.poll(); @@ -1848,12 +1886,12 @@ public void pickFirstMode_fallback() throws Exception { String lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}"; - // Name resolver returns a mix of balancer and backend addresses - List grpclbResolutionList = - createResolvedServerAddresses(false, true, false); + // Name resolver returns balancer and backend addresses + List backendList = createResolvedBackendAddresses(2); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.newBuilder().set( LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs); // Attempted to connect to balancer assertEquals(1, fakeOobChannels.size()); @@ -1868,7 +1906,7 @@ public void pickFirstMode_fallback() throws Exception { // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to // the new createSubchannel(). inOrder.verify(helper).createSubchannel( - eq(Arrays.asList(grpclbResolutionList.get(0), grpclbResolutionList.get(2))), + eq(Arrays.asList(backendList.get(0), backendList.get(1))), any(Attributes.class)); assertThat(mockSubchannels).hasSize(1); @@ -1905,9 +1943,9 @@ public void pickFirstMode_fallback() throws Exception { assertThat(mockSubchannels).isEmpty(); verify(subchannel).updateAddresses( eq(Arrays.asList( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, - eagAttrsWithToken("token0002"))))); + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, + eagAttrsWithToken("token0002"))))); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker2.dropList).containsExactly(null, null); @@ -1927,11 +1965,13 @@ public void switchMode() throws Exception { InOrder inOrder = inOrder(helper); String lbConfig = "{\"childPolicy\" : [ {\"round_robin\" : {}} ]}"; - List grpclbResolutionList = createResolvedServerAddresses(true); + List grpclbBalancerList = createResolvedBalancerAddresses(1); Attributes grpclbResolutionAttrs = Attributes.newBuilder().set( LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); @@ -1941,7 +1981,7 @@ public void switchMode() throws Exception { StreamObserver lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -1971,7 +2011,9 @@ public void switchMode() throws Exception { lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}"; grpclbResolutionAttrs = Attributes.newBuilder().set( LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, grpclbResolutionAttrs); // GrpclbState will be shutdown, and a new one will be created @@ -1989,7 +2031,7 @@ public void switchMode() throws Exception { lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response @@ -2003,8 +2045,8 @@ public void switchMode() throws Exception { // the new createSubchannel(). inOrder.verify(helper).createSubchannel( eq(Arrays.asList( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))), + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))), any(Attributes.class)); inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); @@ -2088,19 +2130,18 @@ public void retrieveModeFromLbConfig_badConfigDefaultToRoundRobin() throws Excep @Test public void grpclbWorking_lbSendsFallbackMessage() { InOrder inOrder = inOrder(helper, subchannelPool); - List grpclbResolutionList = - createResolvedServerAddresses(true, true, false, false); - List fallbackEags = grpclbResolutionList.subList(2, 4); + List backendList = createResolvedBackendAddresses(2); + List grpclbBalancerList = createResolvedBalancerAddresses(2); Attributes grpclbResolutionAttrs = Attributes.EMPTY; - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs); // Fallback timer is started as soon as the addresses are resolved. assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); List addrs = new ArrayList<>(); - addrs.addAll(grpclbResolutionList.get(0).getAddresses()); - addrs.addAll(grpclbResolutionList.get(1).getAddresses()); - Attributes attr = grpclbResolutionList.get(0).getAttributes(); + addrs.addAll(grpclbBalancerList.get(0).getAddresses()); + addrs.addAll(grpclbBalancerList.get(1).getAddresses()); + Attributes attr = grpclbBalancerList.get(0).getAttributes(); EquivalentAddressGroup oobChannelEag = new EquivalentAddressGroup(addrs, attr); verify(helper).createOobChannel(eq(oobChannelEag), eq(lbAuthority(0))); assertEquals(1, fakeOobChannels.size()); @@ -2206,7 +2247,7 @@ public void grpclbWorking_lbSendsFallbackMessage() { .returnSubchannel(eq(subchannel2), eq(ConnectivityStateInfo.forNonError(READY))); // verify fallback - fallbackTestVerifyUseOfFallbackBackendLists(inOrder, fallbackEags); + fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList); assertFalse(oobChannel.isShutdown()); verify(lbRequestObserver, never()).onCompleted(); @@ -2293,48 +2334,62 @@ public void grpclbWorking_lbSendsFallbackMessage() { private void deliverSubchannelState( final Subchannel subchannel, final ConnectivityStateInfo newState) { syncContext.execute(new Runnable() { - @Override - public void run() { - // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to - // the new API. - balancer.handleSubchannelState(subchannel, newState); - } - }); + @Override + public void run() { + // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to + // the new API. + balancer.handleSubchannelState(subchannel, newState); + } + }); } private void deliverNameResolutionError(final Status error) { syncContext.execute(new Runnable() { - @Override - public void run() { - balancer.handleNameResolutionError(error); - } - }); + @Override + public void run() { + balancer.handleNameResolutionError(error); + } + }); } private void deliverResolvedAddresses( - final List addrs, final Attributes attrs) { + final List backendAddrs, + final List balancerAddrs, + Attributes attrs) { + if (!balancerAddrs.isEmpty()) { + attrs = attrs.toBuilder().set(GrpclbConstants.ATTR_LB_ADDRS, balancerAddrs).build(); + } + final Attributes finalAttrs = attrs; syncContext.execute(new Runnable() { - @Override - public void run() { - balancer.handleResolvedAddresses( - ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(attrs).build()); - } - }); + @Override + public void run() { + balancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(backendAddrs) + .setAttributes(finalAttrs) + .build()); + } + }); } private GrpclbClientLoadRecorder getLoadRecorder() { return balancer.getGrpclbState().getLoadRecorder(); } - private static List createResolvedServerAddresses(boolean ... isLb) { - ArrayList list = new ArrayList<>(); - for (int i = 0; i < isLb.length; i++) { + private static List createResolvedBackendAddresses(int n) { + List list = new ArrayList<>(); + for (int i = 0; i < n; i++) { + SocketAddress addr = new FakeSocketAddress("fake-address-" + i); + list.add(new EquivalentAddressGroup(addr)); + } + return list; + } + + private static List createResolvedBalancerAddresses(int n) { + List list = new ArrayList<>(); + for (int i = 0; i < n; i++) { SocketAddress addr = new FakeSocketAddress("fake-address-" + i); - EquivalentAddressGroup eag = - new EquivalentAddressGroup( - addr, - isLb[i] ? lbAttributes(lbAuthority(i)) : Attributes.EMPTY); - list.add(eag); + list.add(new EquivalentAddressGroup(addr, lbAttributes(lbAuthority(i)))); } return list; } @@ -2346,7 +2401,7 @@ private static String lbAuthority(int unused) { private static Attributes lbAttributes(String authority) { return Attributes.newBuilder() - .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority) + .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority) .build(); } diff --git a/xds/src/main/java/io/grpc/xds/FallbackLb.java b/xds/src/main/java/io/grpc/xds/FallbackLb.java index 41486e56eb7..380a22dba19 100644 --- a/xds/src/main/java/io/grpc/xds/FallbackLb.java +++ b/xds/src/main/java/io/grpc/xds/FallbackLb.java @@ -61,6 +61,7 @@ protected LoadBalancer delegate() { return fallbackPolicyLb; } + @SuppressWarnings("deprecation") @Override public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { Attributes attributes = resolvedAddresses.getAttributes(); @@ -113,6 +114,8 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { List servers = resolvedAddresses.getAddresses(); // Some addresses in the list may be grpclb-v1 balancer addresses, so if the fallback policy // does not support grpclb-v1 balancer addresses, then we need to exclude them from the list. + // TODO(chengyuanzhang): delete the following logic after changing internal resolver + // to not include grpclb server addresses. if (!newFallbackPolicyName.equals("grpclb") && !newFallbackPolicyName.equals(XDS_POLICY_NAME)) { ImmutableList.Builder backends = ImmutableList.builder(); for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {