Skip to content

Commit

Permalink
Create the xDS channel outside XdsClient.
Browse files Browse the repository at this point in the history
  • Loading branch information
voidzcy committed Sep 15, 2020
1 parent 603d929 commit 612b791
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 42 deletions.
17 changes: 4 additions & 13 deletions xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java
Expand Up @@ -32,7 +32,6 @@
import io.grpc.internal.ObjectPool;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
Expand All @@ -43,7 +42,7 @@
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool;
import io.grpc.xds.XdsClient.XdsChannelFactory;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsClient.XdsClientFactory;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
Expand Down Expand Up @@ -129,8 +128,10 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
xdsClientPool = attributes.get(XdsAttributes.XDS_CLIENT_POOL);
if (xdsClientPool == null) {
final BootstrapInfo bootstrapInfo;
final XdsChannel channel;
try {
bootstrapInfo = bootstrapper.readBootstrap();
channel = channelFactory.createChannel(bootstrapInfo.getServers());
} catch (Exception e) {
helper.updateBalancingState(
TRANSIENT_FAILURE,
Expand All @@ -139,24 +140,14 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
return;
}

final List<ServerInfo> serverList = bootstrapInfo.getServers();
final Node node = bootstrapInfo.getNode();
if (serverList.isEmpty()) {
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE
.withDescription("No management server provided by bootstrap")));
return;
}
XdsClientFactory xdsClientFactory = new XdsClientFactory() {
@Override
XdsClient createXdsClient() {
return
new XdsClientImpl(
helper.getAuthority(),
serverList,
channelFactory,
channel,
node,
helper.getSynchronizationContext(),
helper.getScheduledExecutorService(),
Expand Down
Expand Up @@ -33,6 +33,7 @@
import io.grpc.xds.EnvoyServerProtoData.DownstreamTlsContext;
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
import io.grpc.xds.EnvoyServerProtoData.FilterChainMatch;
import io.grpc.xds.XdsClient.XdsChannel;
import io.netty.channel.Channel;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
Expand Down Expand Up @@ -120,13 +121,14 @@ public boolean hasXdsClient() {
public void createXdsClientAndStart() throws IOException {
checkState(xdsClient == null, "start() called more than once");
Bootstrapper.BootstrapInfo bootstrapInfo;
List<Bootstrapper.ServerInfo> serverList;
XdsChannel channel;
try {
bootstrapInfo = Bootstrapper.getInstance().readBootstrap();
serverList = bootstrapInfo.getServers();
List<Bootstrapper.ServerInfo> serverList = bootstrapInfo.getServers();
if (serverList.isEmpty()) {
throw new XdsInitializationException("No management server provided by bootstrap");
}
channel = XdsChannelFactory.getInstance().createChannel(serverList);
} catch (XdsInitializationException e) {
reportError(Status.fromThrowable(e));
throw new IOException(e);
Expand All @@ -136,8 +138,7 @@ public void createXdsClientAndStart() throws IOException {
XdsClientImpl xdsClientImpl =
new XdsClientImpl(
"",
serverList,
XdsClient.XdsChannelFactory.getInstance(),
channel,
node,
createSynchronizationContext(),
timeService,
Expand Down
15 changes: 11 additions & 4 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Expand Up @@ -39,8 +39,9 @@
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import io.grpc.xds.XdsClient.ConfigUpdate;
import io.grpc.xds.XdsClient.ConfigWatcher;
import io.grpc.xds.XdsClient.XdsClientPoolFactory;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -70,6 +71,7 @@ final class XdsNameResolver extends NameResolver {
private final ServiceConfigParser serviceConfigParser;
private final SynchronizationContext syncContext;
private final Bootstrapper bootstrapper;
private final XdsChannelFactory channelFactory;
private final XdsClientPoolFactory xdsClientPoolFactory;
private final ThreadSafeRandom random;
private final ConcurrentMap<String, AtomicInteger> clusterRefs = new ConcurrentHashMap<>();
Expand All @@ -85,20 +87,23 @@ final class XdsNameResolver extends NameResolver {
SynchronizationContext syncContext,
XdsClientPoolFactory xdsClientPoolFactory) {
this(name, serviceConfigParser, syncContext, Bootstrapper.getInstance(),
xdsClientPoolFactory, ThreadSafeRandomImpl.instance);
XdsChannelFactory.getInstance(), xdsClientPoolFactory, ThreadSafeRandomImpl.instance);
}

@VisibleForTesting
XdsNameResolver(
String name,
ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext,
Bootstrapper bootstrapper,
XdsChannelFactory channelFactory,
XdsClientPoolFactory xdsClientPoolFactory,
ThreadSafeRandom random) {
authority = GrpcUtil.checkAuthority(checkNotNull(name, "name"));
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.random = checkNotNull(random, "random");
logger = XdsLogger.withLogId(InternalLogId.allocate("xds-resolver", name));
Expand All @@ -114,14 +119,16 @@ public String getServiceAuthority() {
public void start(Listener2 listener) {
this.listener = checkNotNull(listener, "listener");
BootstrapInfo bootstrapInfo;
XdsChannel channel;
try {
bootstrapInfo = bootstrapper.readBootstrap();
channel = channelFactory.createChannel(bootstrapInfo.getServers());
} catch (Exception e) {
listener.onError(
Status.UNAVAILABLE.withDescription("Failed to load xDS bootstrap").withCause(e));
Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
return;
}
xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo);
xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo, channel);
xdsClient = xdsClientPool.getObject();
xdsClient.watchConfigData(authority, new ConfigWatcherImpl());
}
Expand Down
24 changes: 11 additions & 13 deletions xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
Expand Up @@ -31,9 +31,8 @@
import io.grpc.internal.ObjectPool;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool;
import io.grpc.xds.XdsClient.XdsChannelFactory;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsClient.XdsClientFactory;
import io.grpc.xds.XdsClient.XdsClientPoolFactory;
import java.net.URI;
import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -64,11 +63,8 @@ public XdsNameResolver newNameResolver(URI targetUri, Args args) {
String name = targetPath.substring(1);
XdsClientPoolFactory xdsClientPoolFactory =
new RefCountedXdsClientPoolFactory(
name,
XdsChannelFactory.getInstance(),
args.getSynchronizationContext(), args.getScheduledExecutorService(),
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER);
name, args.getSynchronizationContext(), args.getScheduledExecutorService(),
new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER);
return new XdsNameResolver(
name, args.getServiceConfigParser(),
args.getSynchronizationContext(), xdsClientPoolFactory);
Expand All @@ -95,38 +91,40 @@ protected int priority() {

static class RefCountedXdsClientPoolFactory implements XdsClientPoolFactory {
private final String serviceName;
private final XdsChannelFactory channelFactory;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Supplier<Stopwatch> stopwatchSupplier;

RefCountedXdsClientPoolFactory(
String serviceName,
XdsChannelFactory channelFactory,
SynchronizationContext syncContext,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.serviceName = checkNotNull(serviceName, "serviceName");
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timeService = checkNotNull(timeService, "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
}

@Override
public ObjectPool<XdsClient> newXdsClientObjectPool(final BootstrapInfo bootstrapInfo) {
public ObjectPool<XdsClient> newXdsClientObjectPool(
final BootstrapInfo bootstrapInfo, final XdsChannel channel) {
XdsClientFactory xdsClientFactory = new XdsClientFactory() {
@Override
XdsClient createXdsClient() {
return new XdsClientImpl(
serviceName, bootstrapInfo.getServers(), channelFactory, bootstrapInfo.getNode(),
syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
serviceName, channel, bootstrapInfo.getNode(), syncContext, timeService,
backoffPolicyProvider, stopwatchSupplier);
}
};
return new RefCountedXdsClientObjectPool(xdsClientFactory);
}
}

interface XdsClientPoolFactory {
ObjectPool<XdsClient> newXdsClientObjectPool(BootstrapInfo bootstrapInfo, XdsChannel channel);
}
}
4 changes: 1 addition & 3 deletions xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java
Expand Up @@ -80,7 +80,6 @@
import io.grpc.xds.LocalityStore.LocalityStoreFactory;
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsClient.XdsChannelFactory;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -240,8 +239,7 @@ public StreamObserver<DiscoveryRequest> streamAggregatedResources(
xdsClientPoolFromResolveAddresses = new FakeXdsClientPool(
new XdsClientImpl(
SERVICE_AUTHORITY,
serverList,
channelFactory,
new XdsChannel(channel, /* useProtocolV3= */ false),
node,
syncContext,
fakeClock.getScheduledExecutorService(),
Expand Down
52 changes: 47 additions & 5 deletions xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -30,6 +31,7 @@
import io.grpc.CallOptions;
import io.grpc.InternalConfigSelector;
import io.grpc.InternalConfigSelector.Result;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
Expand All @@ -46,11 +48,13 @@
import io.grpc.internal.PickSubchannelArgsImpl;
import io.grpc.testing.TestMethodDescriptors;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.ClusterWeight;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.EnvoyProtoData.Route;
import io.grpc.xds.EnvoyProtoData.RouteAction;
import io.grpc.xds.XdsClient.XdsClientPoolFactory;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -88,6 +92,12 @@ public ConfigOrError parseServiceConfig(Map<String, ?> rawServiceConfig) {
return ConfigOrError.fromConfig(rawServiceConfig);
}
};
private final XdsChannelFactory channelFactory = new XdsChannelFactory() {
@Override
XdsChannel createChannel(List<ServerInfo> servers) throws XdsInitializationException {
return new XdsChannel(mock(ManagedChannel.class), false);
}
};
private final FakeXdsClientPoolFactory xdsClientPoolFactory = new FakeXdsClientPoolFactory();
private final String cluster1 = "cluster-foo.googleapis.com";
private final String cluster2 = "cluster-bar.googleapis.com";
Expand Down Expand Up @@ -119,7 +129,7 @@ public BootstrapInfo readBootstrap() {
}
};
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, bootstrapper,
xdsClientPoolFactory, mockRandom);
channelFactory, xdsClientPoolFactory, mockRandom);
}

@Test
Expand All @@ -131,15 +141,46 @@ public BootstrapInfo readBootstrap() throws XdsInitializationException {
}
};
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, bootstrapper,
xdsClientPoolFactory, mockRandom);
channelFactory, xdsClientPoolFactory, mockRandom);
resolver.start(mockListener);
verify(mockListener).onError(errorCaptor.capture());
Status error = errorCaptor.getValue();
assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(error.getDescription()).isEqualTo("Failed to load xDS bootstrap");
assertThat(error.getDescription()).isEqualTo("Failed to initialize xDS");
assertThat(error.getCause()).hasMessageThat().isEqualTo("Fail to read bootstrap file");
}

@Test
public void resolve_failToCreateXdsChannel() {
Bootstrapper bootstrapper = new Bootstrapper() {
@Override
public BootstrapInfo readBootstrap() {
return new BootstrapInfo(
ImmutableList.of(
new ServerInfo(
"trafficdirector.googleapis.com",
ImmutableList.<ChannelCreds>of(), ImmutableList.<String>of())),
Node.newBuilder().build(),
null);
}
};
XdsChannelFactory channelFactory = new XdsChannelFactory() {
@Override
XdsChannel createChannel(List<ServerInfo> servers) throws XdsInitializationException {
throw new XdsInitializationException("No server with supported channel creds found");
}
};
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, bootstrapper,
channelFactory, xdsClientPoolFactory, mockRandom);
resolver.start(mockListener);
verify(mockListener).onError(errorCaptor.capture());
Status error = errorCaptor.getValue();
assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(error.getDescription()).isEqualTo("Failed to initialize xDS");
assertThat(error.getCause()).hasMessageThat()
.isEqualTo("No server with supported channel creds found");
}

@SuppressWarnings("unchecked")
@Test
public void resolve_resourceNotFound() {
Expand Down Expand Up @@ -472,7 +513,8 @@ public void generateServiceConfig_forMethodTimeoutConfig() throws IOException {

private final class FakeXdsClientPoolFactory implements XdsClientPoolFactory {
@Override
public ObjectPool<XdsClient> newXdsClientObjectPool(BootstrapInfo bootstrapInfo) {
public ObjectPool<XdsClient> newXdsClientObjectPool(
BootstrapInfo bootstrapInfo, XdsChannel channel) {
return new ObjectPool<XdsClient>() {
@Override
public XdsClient getObject() {
Expand Down

0 comments on commit 612b791

Please sign in to comment.