diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java index b009ea7d530..60652af299f 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java @@ -32,7 +32,6 @@ import io.grpc.internal.ObjectPool; import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.xds.Bootstrapper.BootstrapInfo; -import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig; import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.Locality; @@ -43,7 +42,7 @@ import io.grpc.xds.XdsClient.EndpointUpdate; import io.grpc.xds.XdsClient.EndpointWatcher; import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool; -import io.grpc.xds.XdsClient.XdsChannelFactory; +import io.grpc.xds.XdsClient.XdsChannel; import io.grpc.xds.XdsClient.XdsClientFactory; import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; @@ -129,8 +128,10 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { xdsClientPool = attributes.get(XdsAttributes.XDS_CLIENT_POOL); if (xdsClientPool == null) { final BootstrapInfo bootstrapInfo; + final XdsChannel channel; try { bootstrapInfo = bootstrapper.readBootstrap(); + channel = channelFactory.createChannel(bootstrapInfo.getServers()); } catch (Exception e) { helper.updateBalancingState( TRANSIENT_FAILURE, @@ -139,24 +140,14 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { return; } - final List serverList = bootstrapInfo.getServers(); final Node node = bootstrapInfo.getNode(); - if (serverList.isEmpty()) { - helper.updateBalancingState( - TRANSIENT_FAILURE, - new ErrorPicker( - Status.UNAVAILABLE - .withDescription("No management server provided by bootstrap"))); - return; - } XdsClientFactory xdsClientFactory = new XdsClientFactory() { @Override XdsClient createXdsClient() { return new XdsClientImpl( helper.getAuthority(), - serverList, - channelFactory, + channel, node, helper.getSynchronizationContext(), helper.getScheduledExecutorService(), diff --git a/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java b/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java index e79726e8a1b..08742d419bd 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java @@ -33,6 +33,7 @@ import io.grpc.xds.EnvoyServerProtoData.DownstreamTlsContext; import io.grpc.xds.EnvoyServerProtoData.FilterChain; import io.grpc.xds.EnvoyServerProtoData.FilterChainMatch; +import io.grpc.xds.XdsClient.XdsChannel; import io.netty.channel.Channel; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; @@ -120,13 +121,14 @@ public boolean hasXdsClient() { public void createXdsClientAndStart() throws IOException { checkState(xdsClient == null, "start() called more than once"); Bootstrapper.BootstrapInfo bootstrapInfo; - List serverList; + XdsChannel channel; try { bootstrapInfo = Bootstrapper.getInstance().readBootstrap(); - serverList = bootstrapInfo.getServers(); + List serverList = bootstrapInfo.getServers(); if (serverList.isEmpty()) { throw new XdsInitializationException("No management server provided by bootstrap"); } + channel = XdsChannelFactory.getInstance().createChannel(serverList); } catch (XdsInitializationException e) { reportError(Status.fromThrowable(e)); throw new IOException(e); @@ -136,8 +138,7 @@ public void createXdsClientAndStart() throws IOException { XdsClientImpl xdsClientImpl = new XdsClientImpl( "", - serverList, - XdsClient.XdsChannelFactory.getInstance(), + channel, node, createSynchronizationContext(), timeService, diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 22f74f7f795..5db5568e414 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -39,8 +39,9 @@ import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl; import io.grpc.xds.XdsClient.ConfigUpdate; import io.grpc.xds.XdsClient.ConfigWatcher; -import io.grpc.xds.XdsClient.XdsClientPoolFactory; +import io.grpc.xds.XdsClient.XdsChannel; import io.grpc.xds.XdsLogger.XdsLogLevel; +import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -70,6 +71,7 @@ final class XdsNameResolver extends NameResolver { private final ServiceConfigParser serviceConfigParser; private final SynchronizationContext syncContext; private final Bootstrapper bootstrapper; + private final XdsChannelFactory channelFactory; private final XdsClientPoolFactory xdsClientPoolFactory; private final ThreadSafeRandom random; private final ConcurrentMap clusterRefs = new ConcurrentHashMap<>(); @@ -85,20 +87,23 @@ final class XdsNameResolver extends NameResolver { SynchronizationContext syncContext, XdsClientPoolFactory xdsClientPoolFactory) { this(name, serviceConfigParser, syncContext, Bootstrapper.getInstance(), - xdsClientPoolFactory, ThreadSafeRandomImpl.instance); + XdsChannelFactory.getInstance(), xdsClientPoolFactory, ThreadSafeRandomImpl.instance); } + @VisibleForTesting XdsNameResolver( String name, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, Bootstrapper bootstrapper, + XdsChannelFactory channelFactory, XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random) { authority = GrpcUtil.checkAuthority(checkNotNull(name, "name")); this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper"); + this.channelFactory = checkNotNull(channelFactory, "channelFactory"); this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); this.random = checkNotNull(random, "random"); logger = XdsLogger.withLogId(InternalLogId.allocate("xds-resolver", name)); @@ -114,14 +119,16 @@ public String getServiceAuthority() { public void start(Listener2 listener) { this.listener = checkNotNull(listener, "listener"); BootstrapInfo bootstrapInfo; + XdsChannel channel; try { bootstrapInfo = bootstrapper.readBootstrap(); + channel = channelFactory.createChannel(bootstrapInfo.getServers()); } catch (Exception e) { listener.onError( - Status.UNAVAILABLE.withDescription("Failed to load xDS bootstrap").withCause(e)); + Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e)); return; } - xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo); + xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo, channel); xdsClient = xdsClientPool.getObject(); xdsClient.watchConfigData(authority, new ConfigWatcherImpl()); } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java index d00929cc38f..ee7c1d68a01 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java @@ -31,9 +31,8 @@ import io.grpc.internal.ObjectPool; import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool; -import io.grpc.xds.XdsClient.XdsChannelFactory; +import io.grpc.xds.XdsClient.XdsChannel; import io.grpc.xds.XdsClient.XdsClientFactory; -import io.grpc.xds.XdsClient.XdsClientPoolFactory; import java.net.URI; import java.util.concurrent.ScheduledExecutorService; @@ -64,11 +63,8 @@ public XdsNameResolver newNameResolver(URI targetUri, Args args) { String name = targetPath.substring(1); XdsClientPoolFactory xdsClientPoolFactory = new RefCountedXdsClientPoolFactory( - name, - XdsChannelFactory.getInstance(), - args.getSynchronizationContext(), args.getScheduledExecutorService(), - new ExponentialBackoffPolicy.Provider(), - GrpcUtil.STOPWATCH_SUPPLIER); + name, args.getSynchronizationContext(), args.getScheduledExecutorService(), + new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER); return new XdsNameResolver( name, args.getServiceConfigParser(), args.getSynchronizationContext(), xdsClientPoolFactory); @@ -95,7 +91,6 @@ protected int priority() { static class RefCountedXdsClientPoolFactory implements XdsClientPoolFactory { private final String serviceName; - private final XdsChannelFactory channelFactory; private final SynchronizationContext syncContext; private final ScheduledExecutorService timeService; private final BackoffPolicy.Provider backoffPolicyProvider; @@ -103,13 +98,11 @@ static class RefCountedXdsClientPoolFactory implements XdsClientPoolFactory { RefCountedXdsClientPoolFactory( String serviceName, - XdsChannelFactory channelFactory, SynchronizationContext syncContext, ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier) { this.serviceName = checkNotNull(serviceName, "serviceName"); - this.channelFactory = checkNotNull(channelFactory, "channelFactory"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.timeService = checkNotNull(timeService, "timeService"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); @@ -117,16 +110,21 @@ static class RefCountedXdsClientPoolFactory implements XdsClientPoolFactory { } @Override - public ObjectPool newXdsClientObjectPool(final BootstrapInfo bootstrapInfo) { + public ObjectPool newXdsClientObjectPool( + final BootstrapInfo bootstrapInfo, final XdsChannel channel) { XdsClientFactory xdsClientFactory = new XdsClientFactory() { @Override XdsClient createXdsClient() { return new XdsClientImpl( - serviceName, bootstrapInfo.getServers(), channelFactory, bootstrapInfo.getNode(), - syncContext, timeService, backoffPolicyProvider, stopwatchSupplier); + serviceName, channel, bootstrapInfo.getNode(), syncContext, timeService, + backoffPolicyProvider, stopwatchSupplier); } }; return new RefCountedXdsClientObjectPool(xdsClientFactory); } } + + interface XdsClientPoolFactory { + ObjectPool newXdsClientObjectPool(BootstrapInfo bootstrapInfo, XdsChannel channel); + } } diff --git a/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java index 7461159eaa9..192201e7a94 100644 --- a/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java @@ -80,7 +80,6 @@ import io.grpc.xds.LocalityStore.LocalityStoreFactory; import io.grpc.xds.XdsClient.EndpointUpdate; import io.grpc.xds.XdsClient.XdsChannel; -import io.grpc.xds.XdsClient.XdsChannelFactory; import java.net.InetSocketAddress; import java.util.ArrayDeque; import java.util.ArrayList; @@ -240,8 +239,7 @@ public StreamObserver streamAggregatedResources( xdsClientPoolFromResolveAddresses = new FakeXdsClientPool( new XdsClientImpl( SERVICE_AUTHORITY, - serverList, - channelFactory, + new XdsChannel(channel, /* useProtocolV3= */ false), node, syncContext, fakeClock.getScheduledExecutorService(), diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index 30658a045fe..797bd45a561 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -30,6 +31,7 @@ import io.grpc.CallOptions; import io.grpc.InternalConfigSelector; import io.grpc.InternalConfigSelector.Result; +import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; @@ -46,11 +48,13 @@ import io.grpc.internal.PickSubchannelArgsImpl; import io.grpc.testing.TestMethodDescriptors; import io.grpc.xds.Bootstrapper.BootstrapInfo; +import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EnvoyProtoData.ClusterWeight; import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.EnvoyProtoData.Route; import io.grpc.xds.EnvoyProtoData.RouteAction; -import io.grpc.xds.XdsClient.XdsClientPoolFactory; +import io.grpc.xds.XdsClient.XdsChannel; +import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -88,6 +92,12 @@ public ConfigOrError parseServiceConfig(Map rawServiceConfig) { return ConfigOrError.fromConfig(rawServiceConfig); } }; + private final XdsChannelFactory channelFactory = new XdsChannelFactory() { + @Override + XdsChannel createChannel(List servers) throws XdsInitializationException { + return new XdsChannel(mock(ManagedChannel.class), false); + } + }; private final FakeXdsClientPoolFactory xdsClientPoolFactory = new FakeXdsClientPoolFactory(); private final String cluster1 = "cluster-foo.googleapis.com"; private final String cluster2 = "cluster-bar.googleapis.com"; @@ -119,7 +129,7 @@ public BootstrapInfo readBootstrap() { } }; resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, bootstrapper, - xdsClientPoolFactory, mockRandom); + channelFactory, xdsClientPoolFactory, mockRandom); } @Test @@ -131,15 +141,46 @@ public BootstrapInfo readBootstrap() throws XdsInitializationException { } }; resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, bootstrapper, - xdsClientPoolFactory, mockRandom); + channelFactory, xdsClientPoolFactory, mockRandom); resolver.start(mockListener); verify(mockListener).onError(errorCaptor.capture()); Status error = errorCaptor.getValue(); assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("Failed to load xDS bootstrap"); + assertThat(error.getDescription()).isEqualTo("Failed to initialize xDS"); assertThat(error.getCause()).hasMessageThat().isEqualTo("Fail to read bootstrap file"); } + @Test + public void resolve_failToCreateXdsChannel() { + Bootstrapper bootstrapper = new Bootstrapper() { + @Override + public BootstrapInfo readBootstrap() { + return new BootstrapInfo( + ImmutableList.of( + new ServerInfo( + "trafficdirector.googleapis.com", + ImmutableList.of(), ImmutableList.of())), + Node.newBuilder().build(), + null); + } + }; + XdsChannelFactory channelFactory = new XdsChannelFactory() { + @Override + XdsChannel createChannel(List servers) throws XdsInitializationException { + throw new XdsInitializationException("No server with supported channel creds found"); + } + }; + resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, bootstrapper, + channelFactory, xdsClientPoolFactory, mockRandom); + resolver.start(mockListener); + verify(mockListener).onError(errorCaptor.capture()); + Status error = errorCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(error.getDescription()).isEqualTo("Failed to initialize xDS"); + assertThat(error.getCause()).hasMessageThat() + .isEqualTo("No server with supported channel creds found"); + } + @SuppressWarnings("unchecked") @Test public void resolve_resourceNotFound() { @@ -472,7 +513,8 @@ public void generateServiceConfig_forMethodTimeoutConfig() throws IOException { private final class FakeXdsClientPoolFactory implements XdsClientPoolFactory { @Override - public ObjectPool newXdsClientObjectPool(BootstrapInfo bootstrapInfo) { + public ObjectPool newXdsClientObjectPool( + BootstrapInfo bootstrapInfo, XdsChannel channel) { return new ObjectPool() { @Override public XdsClient getObject() {