Skip to content

Commit

Permalink
Refactor XdsClient with xDS channel injected.
Browse files Browse the repository at this point in the history
  • Loading branch information
voidzcy committed Sep 8, 2020
1 parent aaa9be9 commit 311ed9d
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 189 deletions.
68 changes: 0 additions & 68 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Expand Up @@ -16,7 +16,6 @@

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -25,29 +24,22 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.alts.GoogleDefaultChannelBuilder;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.EnvoyProtoData.Route;
import io.grpc.xds.EnvoyServerProtoData.Listener;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.LoadStatsManager.LoadStatsStore;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -613,62 +605,6 @@ public synchronized XdsClient returnObject(Object object) {
}
}

/**
* Factory for creating channels to xDS severs.
*/
abstract static class XdsChannelFactory {
@VisibleForTesting
static boolean experimentalV3SupportEnvVar = Boolean.parseBoolean(
System.getenv("GRPC_XDS_EXPERIMENTAL_V3_SUPPORT"));

private static final String XDS_V3_SERVER_FEATURE = "xds_v3";
private static final XdsChannelFactory DEFAULT_INSTANCE = new XdsChannelFactory() {
/**
* Creates a channel to the first server in the given list.
*/
@Override
XdsChannel createChannel(List<ServerInfo> servers) {
checkArgument(!servers.isEmpty(), "No management server provided.");
XdsLogger logger = XdsLogger.withPrefix("xds-client-channel-factory");
ServerInfo serverInfo = servers.get(0);
String serverUri = serverInfo.getServerUri();
logger.log(XdsLogLevel.INFO, "Creating channel to {0}", serverUri);
List<ChannelCreds> channelCredsList = serverInfo.getChannelCredentials();
ManagedChannelBuilder<?> channelBuilder = null;
// Use the first supported channel credentials configuration.
// Currently, only "google_default" is supported.
for (ChannelCreds creds : channelCredsList) {
if (creds.getType().equals("google_default")) {
logger.log(XdsLogLevel.INFO, "Using channel credentials: google_default");
channelBuilder = GoogleDefaultChannelBuilder.forTarget(serverUri);
break;
}
}
if (channelBuilder == null) {
logger.log(XdsLogLevel.INFO, "Using default channel credentials");
channelBuilder = ManagedChannelBuilder.forTarget(serverUri);
}

ManagedChannel channel = channelBuilder
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
boolean useProtocolV3 = experimentalV3SupportEnvVar
&& serverInfo.getServerFeatures().contains(XDS_V3_SERVER_FEATURE);

return new XdsChannel(channel, useProtocolV3);
}
};

static XdsChannelFactory getInstance() {
return DEFAULT_INSTANCE;
}

/**
* Creates a channel to one of the provided management servers.
*/
abstract XdsChannel createChannel(List<ServerInfo> servers);
}

static final class XdsChannel {
private final ManagedChannel managedChannel;
private final boolean useProtocolV3;
Expand All @@ -687,8 +623,4 @@ boolean isUseProtocolV3() {
return useProtocolV3;
}
}

interface XdsClientPoolFactory {
ObjectPool<XdsClient> newXdsClientObjectPool(BootstrapInfo bootstrapInfo);
}
}
8 changes: 2 additions & 6 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Expand Up @@ -196,18 +196,14 @@ final class XdsClientImpl extends XdsClient {

XdsClientImpl(
String targetName,
List<ServerInfo> servers, // list of management servers
XdsChannelFactory channelFactory,
XdsChannel channel,
Node node,
SynchronizationContext syncContext,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.targetName = checkNotNull(targetName, "targetName");
XdsChannel xdsChannel =
checkNotNull(channelFactory, "channelFactory")
.createChannel(checkNotNull(servers, "servers"));
this.xdsChannel = xdsChannel;
this.xdsChannel = checkNotNull(channel, "channel");
this.node = checkNotNull(node, "node");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timeService = checkNotNull(timeService, "timeService");
Expand Down
14 changes: 10 additions & 4 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver2.java
Expand Up @@ -39,8 +39,9 @@
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import io.grpc.xds.XdsClient.ConfigUpdate;
import io.grpc.xds.XdsClient.ConfigWatcher;
import io.grpc.xds.XdsClient.XdsClientPoolFactory;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsNameResolverProvider2.XdsClientPoolFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -70,6 +71,7 @@ final class XdsNameResolver2 extends NameResolver {
private final ServiceConfigParser serviceConfigParser;
private final SynchronizationContext syncContext;
private final Bootstrapper bootstrapper;
private final XdsChannelFactory channelFactory;
private final XdsClientPoolFactory xdsClientPoolFactory;
private final ThreadSafeRandom random;
private final ConcurrentMap<String, AtomicInteger> clusterRefs = new ConcurrentHashMap<>();
Expand All @@ -85,20 +87,22 @@ final class XdsNameResolver2 extends NameResolver {
SynchronizationContext syncContext,
XdsClientPoolFactory xdsClientPoolFactory) {
this(name, serviceConfigParser, syncContext, Bootstrapper.getInstance(),
xdsClientPoolFactory, ThreadSafeRandomImpl.instance);
XdsChannelFactory.getInstance(), xdsClientPoolFactory, ThreadSafeRandomImpl.instance);
}

XdsNameResolver2(
String name,
ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext,
Bootstrapper bootstrapper,
XdsChannelFactory channelFactory,
XdsClientPoolFactory xdsClientPoolFactory,
ThreadSafeRandom random) {
authority = GrpcUtil.checkAuthority(checkNotNull(name, "name"));
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.random = checkNotNull(random, "random");
logger = XdsLogger.withLogId(InternalLogId.allocate("xds-resolver", name));
Expand All @@ -114,14 +118,16 @@ public String getServiceAuthority() {
public void start(Listener2 listener) {
this.listener = checkNotNull(listener, "listener");
BootstrapInfo bootstrapInfo;
XdsChannel channel;
try {
bootstrapInfo = bootstrapper.readBootstrap();
channel = channelFactory.createChannel(bootstrapInfo.getServers());
} catch (Exception e) {
listener.onError(
Status.UNAVAILABLE.withDescription("Failed to load xDS bootstrap").withCause(e));
Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
return;
}
xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo);
xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo, channel);
xdsClient = xdsClientPool.getObject();
xdsClient.watchConfigData(authority, new ConfigWatcherImpl());
}
Expand Down
26 changes: 13 additions & 13 deletions xds/src/main/java/io/grpc/xds/XdsNameResolverProvider2.java
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
Expand All @@ -31,9 +32,8 @@
import io.grpc.internal.ObjectPool;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool;
import io.grpc.xds.XdsClient.XdsChannelFactory;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsClient.XdsClientFactory;
import io.grpc.xds.XdsClient.XdsClientPoolFactory;
import java.net.URI;
import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -64,11 +64,8 @@ public XdsNameResolver2 newNameResolver(URI targetUri, Args args) {
String name = targetPath.substring(1);
XdsClientPoolFactory xdsClientPoolFactory =
new RefCountedXdsClientPoolFactory(
name,
XdsChannelFactory.getInstance(),
args.getSynchronizationContext(), args.getScheduledExecutorService(),
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER);
name, args.getSynchronizationContext(), args.getScheduledExecutorService(),
new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER);
return new XdsNameResolver2(
name, args.getServiceConfigParser(),
args.getSynchronizationContext(), xdsClientPoolFactory);
Expand All @@ -95,38 +92,41 @@ protected int priority() {

static class RefCountedXdsClientPoolFactory implements XdsClientPoolFactory {
private final String serviceName;
private final XdsChannelFactory channelFactory;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Supplier<Stopwatch> stopwatchSupplier;

RefCountedXdsClientPoolFactory(
String serviceName,
XdsChannelFactory channelFactory,
SynchronizationContext syncContext,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.serviceName = checkNotNull(serviceName, "serviceName");
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timeService = checkNotNull(timeService, "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
}

@Override
public ObjectPool<XdsClient> newXdsClientObjectPool(final BootstrapInfo bootstrapInfo) {
public ObjectPool<XdsClient> newXdsClientObjectPool(
final BootstrapInfo bootstrapInfo, final XdsChannel channel) {
XdsClientFactory xdsClientFactory = new XdsClientFactory() {
@Override
XdsClient createXdsClient() {
return new XdsClientImpl(
serviceName, bootstrapInfo.getServers(), channelFactory, bootstrapInfo.getNode(),
syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
serviceName, channel, bootstrapInfo.getNode(), syncContext, timeService,
backoffPolicyProvider, stopwatchSupplier);
}
};
return new RefCountedXdsClientObjectPool(xdsClientFactory);
}
}

@VisibleForTesting
interface XdsClientPoolFactory {
ObjectPool<XdsClient> newXdsClientObjectPool(BootstrapInfo bootstrapInfo, XdsChannel channel);
}
}
18 changes: 1 addition & 17 deletions xds/src/test/java/io/grpc/xds/XdsClientImplTest.java
Expand Up @@ -86,8 +86,6 @@
import io.grpc.internal.FakeClock.TaskFilter;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.LbEndpoint;
import io.grpc.xds.EnvoyProtoData.Locality;
Expand All @@ -100,7 +98,6 @@
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsClient.XdsChannelFactory;
import io.grpc.xds.XdsClientImpl.MessagePrinter;
import java.io.IOException;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -279,23 +276,10 @@ public void cancelled(Context context) {
channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());

List<ServerInfo> servers =
ImmutableList.of(new ServerInfo(serverName, ImmutableList.<ChannelCreds>of(), null));
XdsChannelFactory channelFactory = new XdsChannelFactory() {
@Override
XdsChannel createChannel(List<ServerInfo> servers) {
ServerInfo serverInfo = Iterables.getOnlyElement(servers);
assertThat(serverInfo.getServerUri()).isEqualTo(serverName);
assertThat(serverInfo.getChannelCredentials()).isEmpty();
return new XdsChannel(channel, /* useProtocolV3= */ true);
}
};

xdsClient =
new XdsClientImpl(
TARGET_AUTHORITY,
servers,
channelFactory,
new XdsChannel(channel, /* useProtocolV3= */ true),
EnvoyProtoData.Node.newBuilder().build(),
syncContext,
fakeClock.getScheduledExecutorService(),
Expand Down
18 changes: 1 addition & 17 deletions xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java
Expand Up @@ -86,8 +86,6 @@
import io.grpc.internal.FakeClock.TaskFilter;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.LbEndpoint;
import io.grpc.xds.EnvoyProtoData.Locality;
Expand All @@ -100,7 +98,6 @@
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsClient.XdsChannelFactory;
import io.grpc.xds.XdsClientImpl.MessagePrinter;
import java.io.IOException;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -278,23 +275,10 @@ public void cancelled(Context context) {
channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());

List<ServerInfo> servers =
ImmutableList.of(new ServerInfo(serverName, ImmutableList.<ChannelCreds>of(), null));
XdsChannelFactory channelFactory = new XdsChannelFactory() {
@Override
XdsChannel createChannel(List<ServerInfo> servers) {
ServerInfo serverInfo = Iterables.getOnlyElement(servers);
assertThat(serverInfo.getServerUri()).isEqualTo(serverName);
assertThat(serverInfo.getChannelCredentials()).isEmpty();
return new XdsChannel(channel, false);
}
};

xdsClient =
new XdsClientImpl(
TARGET_AUTHORITY,
servers,
channelFactory,
new XdsChannel(channel, /* useProtocolV3= */ false),
Node.newBuilder().build(),
syncContext,
fakeClock.getScheduledExecutorService(),
Expand Down

0 comments on commit 311ed9d

Please sign in to comment.