Skip to content

Commit

Permalink
xds: fail to create xDS channel if no server with supported channel c…
Browse files Browse the repository at this point in the history
…reds found (#7400)

Create the xDS channel outside the XdsClient. Throw an XdsInitializationException if the provided server list (parsed from the bootstrap file) can not be used to create such a channel. The exception is caught by the xDS resolver and propagated to the Channel gracefully as a name resolution error.
  • Loading branch information
voidzcy committed Sep 19, 2020
1 parent e6b61ea commit b31d683
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 227 deletions.
17 changes: 4 additions & 13 deletions xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java
Expand Up @@ -32,7 +32,6 @@
import io.grpc.internal.ObjectPool;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
Expand All @@ -43,7 +42,7 @@
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool;
import io.grpc.xds.XdsClient.XdsChannelFactory;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsClient.XdsClientFactory;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
Expand Down Expand Up @@ -129,8 +128,10 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
xdsClientPool = attributes.get(XdsAttributes.XDS_CLIENT_POOL);
if (xdsClientPool == null) {
final BootstrapInfo bootstrapInfo;
final XdsChannel channel;
try {
bootstrapInfo = bootstrapper.readBootstrap();
channel = channelFactory.createChannel(bootstrapInfo.getServers());
} catch (Exception e) {
helper.updateBalancingState(
TRANSIENT_FAILURE,
Expand All @@ -139,24 +140,14 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
return;
}

final List<ServerInfo> serverList = bootstrapInfo.getServers();
final Node node = bootstrapInfo.getNode();
if (serverList.isEmpty()) {
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE
.withDescription("No management server provided by bootstrap")));
return;
}
XdsClientFactory xdsClientFactory = new XdsClientFactory() {
@Override
XdsClient createXdsClient() {
return
new XdsClientImpl(
helper.getAuthority(),
serverList,
channelFactory,
channel,
node,
helper.getSynchronizationContext(),
helper.getScheduledExecutorService(),
Expand Down
100 changes: 100 additions & 0 deletions xds/src/main/java/io/grpc/xds/XdsChannelFactory.java
@@ -0,0 +1,100 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.alts.GoogleDefaultChannelBuilder;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Factory for creating channels to xDS severs.
*/
abstract class XdsChannelFactory {
@VisibleForTesting
static boolean experimentalV3SupportEnvVar = Boolean.parseBoolean(
System.getenv("GRPC_XDS_EXPERIMENTAL_V3_SUPPORT"));

private static final String XDS_V3_SERVER_FEATURE = "xds_v3";
private static final XdsChannelFactory DEFAULT_INSTANCE = new XdsChannelFactory() {
/**
* Creates a channel to the first server in the given list.
*/
@Override
XdsChannel createChannel(List<ServerInfo> servers) throws XdsInitializationException {
if (servers.isEmpty()) {
throw new XdsInitializationException("No server provided");
}
XdsLogger logger = XdsLogger.withPrefix("xds-client-channel-factory");
ServerInfo serverInfo = servers.get(0);
String serverUri = serverInfo.getServerUri();
logger.log(XdsLogLevel.INFO, "Creating channel to {0}", serverUri);
List<ChannelCreds> channelCredsList = serverInfo.getChannelCredentials();
ManagedChannelBuilder<?> channelBuilder = null;
// Use the first supported channel credentials configuration.
for (ChannelCreds creds : channelCredsList) {
switch (creds.getType()) {
case "google_default":
logger.log(XdsLogLevel.INFO, "Using channel credentials: google_default");
channelBuilder = GoogleDefaultChannelBuilder.forTarget(serverUri);
break;
case "insecure":
logger.log(XdsLogLevel.INFO, "Using channel credentials: insecure");
channelBuilder = ManagedChannelBuilder.forTarget(serverUri).usePlaintext();
break;
case "tls":
logger.log(XdsLogLevel.INFO, "Using channel credentials: tls");
channelBuilder = ManagedChannelBuilder.forTarget(serverUri);
break;
default:
}
if (channelBuilder != null) {
break;
}
}
if (channelBuilder == null) {
throw new XdsInitializationException("No server with supported channel creds found");
}

ManagedChannel channel = channelBuilder
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
boolean useProtocolV3 = experimentalV3SupportEnvVar
&& serverInfo.getServerFeatures().contains(XDS_V3_SERVER_FEATURE);

return new XdsChannel(channel, useProtocolV3);
}
};

static XdsChannelFactory getInstance() {
return DEFAULT_INSTANCE;
}

/**
* Creates a channel to one of the provided management servers.
*
* @throws XdsInitializationException if failed to create a channel with the given list of
* servers.
*/
abstract XdsChannel createChannel(List<ServerInfo> servers) throws XdsInitializationException;
}
68 changes: 0 additions & 68 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Expand Up @@ -16,7 +16,6 @@

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -25,29 +24,22 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.alts.GoogleDefaultChannelBuilder;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.EnvoyProtoData.Route;
import io.grpc.xds.EnvoyServerProtoData.Listener;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.LoadStatsManager.LoadStatsStore;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -613,62 +605,6 @@ public synchronized XdsClient returnObject(Object object) {
}
}

/**
* Factory for creating channels to xDS severs.
*/
abstract static class XdsChannelFactory {
@VisibleForTesting
static boolean experimentalV3SupportEnvVar = Boolean.parseBoolean(
System.getenv("GRPC_XDS_EXPERIMENTAL_V3_SUPPORT"));

private static final String XDS_V3_SERVER_FEATURE = "xds_v3";
private static final XdsChannelFactory DEFAULT_INSTANCE = new XdsChannelFactory() {
/**
* Creates a channel to the first server in the given list.
*/
@Override
XdsChannel createChannel(List<ServerInfo> servers) {
checkArgument(!servers.isEmpty(), "No management server provided.");
XdsLogger logger = XdsLogger.withPrefix("xds-client-channel-factory");
ServerInfo serverInfo = servers.get(0);
String serverUri = serverInfo.getServerUri();
logger.log(XdsLogLevel.INFO, "Creating channel to {0}", serverUri);
List<ChannelCreds> channelCredsList = serverInfo.getChannelCredentials();
ManagedChannelBuilder<?> channelBuilder = null;
// Use the first supported channel credentials configuration.
// Currently, only "google_default" is supported.
for (ChannelCreds creds : channelCredsList) {
if (creds.getType().equals("google_default")) {
logger.log(XdsLogLevel.INFO, "Using channel credentials: google_default");
channelBuilder = GoogleDefaultChannelBuilder.forTarget(serverUri);
break;
}
}
if (channelBuilder == null) {
logger.log(XdsLogLevel.INFO, "Using default channel credentials");
channelBuilder = ManagedChannelBuilder.forTarget(serverUri);
}

ManagedChannel channel = channelBuilder
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
boolean useProtocolV3 = experimentalV3SupportEnvVar
&& serverInfo.getServerFeatures().contains(XDS_V3_SERVER_FEATURE);

return new XdsChannel(channel, useProtocolV3);
}
};

static XdsChannelFactory getInstance() {
return DEFAULT_INSTANCE;
}

/**
* Creates a channel to one of the provided management servers.
*/
abstract XdsChannel createChannel(List<ServerInfo> servers);
}

static final class XdsChannel {
private final ManagedChannel managedChannel;
private final boolean useProtocolV3;
Expand All @@ -687,8 +623,4 @@ boolean isUseProtocolV3() {
return useProtocolV3;
}
}

interface XdsClientPoolFactory {
ObjectPool<XdsClient> newXdsClientObjectPool(BootstrapInfo bootstrapInfo);
}
}
9 changes: 2 additions & 7 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Expand Up @@ -53,7 +53,6 @@
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
Expand Down Expand Up @@ -204,18 +203,14 @@ final class XdsClientImpl extends XdsClient {

XdsClientImpl(
String targetName,
List<ServerInfo> servers, // list of management servers
XdsChannelFactory channelFactory,
XdsChannel channel,
Node node,
SynchronizationContext syncContext,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.targetName = checkNotNull(targetName, "targetName");
XdsChannel xdsChannel =
checkNotNull(channelFactory, "channelFactory")
.createChannel(checkNotNull(servers, "servers"));
this.xdsChannel = xdsChannel;
this.xdsChannel = checkNotNull(channel, "channel");
this.node = checkNotNull(node, "node");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timeService = checkNotNull(timeService, "timeService");
Expand Down
Expand Up @@ -33,6 +33,7 @@
import io.grpc.xds.EnvoyServerProtoData.DownstreamTlsContext;
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
import io.grpc.xds.EnvoyServerProtoData.FilterChainMatch;
import io.grpc.xds.XdsClient.XdsChannel;
import io.netty.channel.Channel;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
Expand Down Expand Up @@ -120,13 +121,14 @@ public boolean hasXdsClient() {
public void createXdsClientAndStart() throws IOException {
checkState(xdsClient == null, "start() called more than once");
Bootstrapper.BootstrapInfo bootstrapInfo;
List<Bootstrapper.ServerInfo> serverList;
XdsChannel channel;
try {
bootstrapInfo = Bootstrapper.getInstance().readBootstrap();
serverList = bootstrapInfo.getServers();
List<Bootstrapper.ServerInfo> serverList = bootstrapInfo.getServers();
if (serverList.isEmpty()) {
throw new XdsInitializationException("No management server provided by bootstrap");
}
channel = XdsChannelFactory.getInstance().createChannel(serverList);
} catch (XdsInitializationException e) {
reportError(Status.fromThrowable(e));
throw new IOException(e);
Expand All @@ -136,8 +138,7 @@ public void createXdsClientAndStart() throws IOException {
XdsClientImpl xdsClientImpl =
new XdsClientImpl(
"",
serverList,
XdsClient.XdsChannelFactory.getInstance(),
channel,
node,
createSynchronizationContext(),
timeService,
Expand Down
15 changes: 11 additions & 4 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Expand Up @@ -39,8 +39,9 @@
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import io.grpc.xds.XdsClient.ConfigUpdate;
import io.grpc.xds.XdsClient.ConfigWatcher;
import io.grpc.xds.XdsClient.XdsClientPoolFactory;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -70,6 +71,7 @@ final class XdsNameResolver extends NameResolver {
private final ServiceConfigParser serviceConfigParser;
private final SynchronizationContext syncContext;
private final Bootstrapper bootstrapper;
private final XdsChannelFactory channelFactory;
private final XdsClientPoolFactory xdsClientPoolFactory;
private final ThreadSafeRandom random;
private final ConcurrentMap<String, AtomicInteger> clusterRefs = new ConcurrentHashMap<>();
Expand All @@ -85,20 +87,23 @@ final class XdsNameResolver extends NameResolver {
SynchronizationContext syncContext,
XdsClientPoolFactory xdsClientPoolFactory) {
this(name, serviceConfigParser, syncContext, Bootstrapper.getInstance(),
xdsClientPoolFactory, ThreadSafeRandomImpl.instance);
XdsChannelFactory.getInstance(), xdsClientPoolFactory, ThreadSafeRandomImpl.instance);
}

@VisibleForTesting
XdsNameResolver(
String name,
ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext,
Bootstrapper bootstrapper,
XdsChannelFactory channelFactory,
XdsClientPoolFactory xdsClientPoolFactory,
ThreadSafeRandom random) {
authority = GrpcUtil.checkAuthority(checkNotNull(name, "name"));
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.random = checkNotNull(random, "random");
logger = XdsLogger.withLogId(InternalLogId.allocate("xds-resolver", name));
Expand All @@ -114,14 +119,16 @@ public String getServiceAuthority() {
public void start(Listener2 listener) {
this.listener = checkNotNull(listener, "listener");
BootstrapInfo bootstrapInfo;
XdsChannel channel;
try {
bootstrapInfo = bootstrapper.readBootstrap();
channel = channelFactory.createChannel(bootstrapInfo.getServers());
} catch (Exception e) {
listener.onError(
Status.UNAVAILABLE.withDescription("Failed to load xDS bootstrap").withCause(e));
Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
return;
}
xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo);
xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo, channel);
xdsClient = xdsClientPool.getObject();
xdsClient.watchConfigData(authority, new ConfigWatcherImpl());
}
Expand Down

0 comments on commit b31d683

Please sign in to comment.