Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js-xds: Implement aggregate and logical DNS clusters #2343

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions packages/grpc-js-xds/src/generated/cluster.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions packages/grpc-js-xds/src/index.ts
Expand Up @@ -17,7 +17,8 @@

import * as resolver_xds from './resolver-xds';
import * as load_balancer_cds from './load-balancer-cds';
import * as load_balancer_eds from './load-balancer-eds';
import * as xds_cluster_resolver from './load-balancer-xds-cluster-resolver';
import * as xds_cluster_impl from './load-balancer-xds-cluster-impl';
import * as load_balancer_lrs from './load-balancer-lrs';
import * as load_balancer_priority from './load-balancer-priority';
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
Expand All @@ -32,7 +33,8 @@ import * as csds from './csds';
export function register() {
resolver_xds.setup();
load_balancer_cds.setup();
load_balancer_eds.setup();
xds_cluster_resolver.setup();
xds_cluster_impl.setup();
load_balancer_lrs.setup();
load_balancer_priority.setup();
load_balancer_weighted_target.setup();
Expand Down
231 changes: 171 additions & 60 deletions packages/grpc-js-xds/src/load-balancer-cds.ts
Expand Up @@ -28,11 +28,13 @@ import LoadBalancingConfig = experimental.LoadBalancingConfig;
import OutlierDetectionLoadBalancingConfig = experimental.OutlierDetectionLoadBalancingConfig;
import SuccessRateEjectionConfig = experimental.SuccessRateEjectionConfig;
import FailurePercentageEjectionConfig = experimental.FailurePercentageEjectionConfig;
import { EdsLoadBalancingConfig } from './load-balancer-eds';
import QueuePicker = experimental.QueuePicker;
import { Watcher } from './xds-stream-state/xds-stream-state';
import { OutlierDetection__Output } from './generated/envoy/config/cluster/v3/OutlierDetection';
import { Duration__Output } from './generated/google/protobuf/Duration';
import { EXPERIMENTAL_OUTLIER_DETECTION } from './environment';
import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler, XdsClusterResolverLoadBalancingConfig } from './load-balancer-xds-cluster-resolver';
import { CLUSTER_CONFIG_TYPE_URL, decodeSingleResource } from './resources';

const TRACER_NAME = 'cds_balancer';

Expand Down Expand Up @@ -115,57 +117,161 @@ function translateOutlierDetectionConfig(outlierDetection: OutlierDetection__Out
);
}

interface ClusterEntry {
watcher: Watcher<Cluster__Output>;
latestUpdate?: Cluster__Output;
children: string[];
}

interface ClusterTree {
[name: string]: ClusterEntry;
}

function isClusterTreeFullyUpdated(tree: ClusterTree, root: string): boolean {
const toCheck: string[] = [root];
const visited = new Set<string>();
while (toCheck.length > 0) {
const next = toCheck.shift()!;
if (visited.has(next)) {
continue;
}
visited.add(next);
if (!tree[next] || !tree[next].latestUpdate) {
return false;
}
toCheck.push(...tree[next].children);
}
return true;
}

function generateDiscoveryMechanismForCluster(config: Cluster__Output): DiscoveryMechanism {
let maxConcurrentRequests: number | undefined = undefined;
for (const threshold of config.circuit_breakers?.thresholds ?? []) {
if (threshold.priority === 'DEFAULT') {
maxConcurrentRequests = threshold.max_requests?.value;
}
}
if (config.type === 'EDS') {
// EDS cluster
return {
cluster: config.name,
lrs_load_reporting_server_name: config.lrs_server?.self ? '' : undefined,
max_concurrent_requests: maxConcurrentRequests,
type: 'EDS',
eds_service_name: config.eds_cluster_config!.service_name === '' ? undefined : config.eds_cluster_config!.service_name
};
} else {
// Logical DNS cluster
const socketAddress = config.load_assignment!.endpoints[0].lb_endpoints[0].endpoint!.address!.socket_address!;
return {
cluster: config.name,
lrs_load_reporting_server_name: config.lrs_server?.self ? '' : undefined,
max_concurrent_requests: maxConcurrentRequests,
type: 'LOGICAL_DNS',
dns_hostname: `${socketAddress.address}:${socketAddress.port_value}`
};
}
}

const RECURSION_DEPTH_LIMIT = 15;

/**
* Prerequisite: isClusterTreeFullyUpdated(tree, root)
* @param tree
* @param root
*/
function getDiscoveryMechanismList(tree: ClusterTree, root: string): DiscoveryMechanism[] {
const visited = new Set<string>();
function getDiscoveryMechanismListHelper(node: string, depth: number): DiscoveryMechanism[] {
if (depth > RECURSION_DEPTH_LIMIT) {
throw new Error('aggregate cluster graph exceeds max depth');
}
if (visited.has(node)) {
return [];
}
visited.add(node);
if (tree[node].children.length > 0) {
trace('Visit ' + node + ' children: [' + tree[node].children + ']');
// Aggregate cluster
const result = [];
for (const child of tree[node].children) {
result.push(...getDiscoveryMechanismListHelper(child, depth + 1));
}
return result;
} else {
trace('Visit leaf ' + node);
// individual cluster
const config = tree[node].latestUpdate!;
return [generateDiscoveryMechanismForCluster(config)];
}
}
return getDiscoveryMechanismListHelper(root, 0);
}

export class CdsLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private watcher: Watcher<Cluster__Output>;

private isWatcherActive = false;

private latestCdsUpdate: Cluster__Output | null = null;

private latestConfig: CdsLoadBalancingConfig | null = null;
private latestAttributes: { [key: string]: unknown } = {};
private xdsClient: XdsClient | null = null;

private clusterTree: ClusterTree = {};

private updatedChild = false;

constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
this.watcher = {
this.childBalancer = new XdsClusterResolverChildPolicyHandler(channelControlHelper);
}

private addCluster(cluster: string) {
if (cluster in this.clusterTree) {
return;
}
trace('Adding watcher for cluster ' + cluster);
const watcher: Watcher<Cluster__Output> = {
onValidUpdate: (update) => {
this.latestCdsUpdate = update;
let maxConcurrentRequests: number | undefined = undefined;
for (const threshold of update.circuit_breakers?.thresholds ?? []) {
if (threshold.priority === 'DEFAULT') {
maxConcurrentRequests = threshold.max_requests?.value;
this.clusterTree[cluster].latestUpdate = update;
if (update.cluster_discovery_type === 'cluster_type') {
const children = decodeSingleResource(CLUSTER_CONFIG_TYPE_URL, update.cluster_type!.typed_config!.value).clusters;
trace('Received update for aggregate cluster ' + cluster + ' with children [' + children + ']');
this.clusterTree[cluster].children = children;
children.forEach(child => this.addCluster(child));
}
if (isClusterTreeFullyUpdated(this.clusterTree, this.latestConfig!.getCluster())) {
let discoveryMechanismList: DiscoveryMechanism[];
try {
discoveryMechanismList = getDiscoveryMechanismList(this.clusterTree, this.latestConfig!.getCluster());
} catch (e) {
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: e.message, metadata: new Metadata()}));
return;
}
const clusterResolverConfig = new XdsClusterResolverLoadBalancingConfig(
discoveryMechanismList,
[],
[]
);
trace('Child update config: ' + JSON.stringify(clusterResolverConfig));
this.updatedChild = true;
this.childBalancer.updateAddressList(
[],
clusterResolverConfig,
this.latestAttributes
);
}
/* the lrs_server.self field indicates that the same server should be
* used for load reporting as for other xDS operations. Setting
* lrsLoadReportingServerName to the empty string sets that behavior.
* Otherwise, if the field is omitted, load reporting is disabled. */
const edsConfig: EdsLoadBalancingConfig = new EdsLoadBalancingConfig(
/* cluster= */ update.name,
/* localityPickingPolicy= */ [],
/* endpointPickingPolicy= */ [],
/* edsServiceName= */ update.eds_cluster_config!.service_name === '' ? undefined : update.eds_cluster_config!.service_name,
/* lrsLoadReportingServerName= */update.lrs_server?.self ? '' : undefined,
/* maxConcurrentRequests= */ maxConcurrentRequests,
/* outlierDetection= */ translateOutlierDetectionConfig(update.outlier_detection)
);
trace('Child update EDS config: ' + JSON.stringify(edsConfig));
this.childBalancer.updateAddressList(
[],
edsConfig,
this.latestAttributes
);
},
onResourceDoesNotExist: () => {
this.isWatcherActive = false;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: 'CDS resource does not exist', metadata: new Metadata()}));
if (cluster in this.clusterTree) {
this.clusterTree[cluster].latestUpdate = undefined;
this.clusterTree[cluster].children = [];
}
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `CDS resource ${cluster} does not exist`, metadata: new Metadata()}));
this.childBalancer.destroy();
},
onTransientError: (statusObj) => {
if (this.latestCdsUpdate === null) {
channelControlHelper.updateState(
if (!this.updatedChild) {
this.channelControlHelper.updateState(
connectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
code: status.UNAVAILABLE,
Expand All @@ -174,8 +280,27 @@ export class CdsLoadBalancer implements LoadBalancer {
})
);
}
},
}
};
this.clusterTree[cluster] = {
watcher: watcher,
children: []
};
this.xdsClient?.addClusterWatcher(cluster, watcher);
}

private removeCluster(cluster: string) {
if (!(cluster in this.clusterTree)) {
return;
}
this.xdsClient?.removeClusterWatcher(cluster, this.clusterTree[cluster].watcher);
delete this.clusterTree[cluster];
}

private clearClusterTree() {
for (const cluster of Object.keys(this.clusterTree)) {
this.removeCluster(cluster);
}
}

updateAddressList(
Expand All @@ -194,29 +319,20 @@ export class CdsLoadBalancer implements LoadBalancer {
/* If the cluster is changing, disable the old watcher before adding the new
* one */
if (
this.isWatcherActive &&
this.latestConfig?.getCluster() !== lbConfig.getCluster()
this.latestConfig && this.latestConfig.getCluster() !== lbConfig.getCluster()
) {
trace('Removing old cluster watcher for cluster name ' + this.latestConfig!.getCluster());
this.xdsClient.removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
/* Setting isWatcherActive to false here lets us have one code path for
* calling addClusterWatcher */
this.isWatcherActive = false;
/* If we have a new name, the latestCdsUpdate does not correspond to
* the new config, so it is no longer valid */
this.latestCdsUpdate = null;
trace('Removing old cluster watchers rooted at ' + this.latestConfig.getCluster());
this.clearClusterTree();
this.updatedChild = false;
}

if (!this.latestConfig) {
this.channelControlHelper.updateState(connectivityState.CONNECTING, new QueuePicker(this));
}

this.latestConfig = lbConfig;

if (!this.isWatcherActive) {
trace('Adding new cluster watcher for cluster name ' + lbConfig.getCluster());
this.xdsClient.addClusterWatcher(lbConfig.getCluster(), this.watcher);
this.isWatcherActive = true;
}
this.addCluster(lbConfig.getCluster());
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand All @@ -225,14 +341,9 @@ export class CdsLoadBalancer implements LoadBalancer {
this.childBalancer.resetBackoff();
}
destroy(): void {
trace('Destroying load balancer with cluster name ' + this.latestConfig?.getCluster());
trace('Destroying load balancer rooted at cluster named ' + this.latestConfig?.getCluster());
this.childBalancer.destroy();
if (this.isWatcherActive) {
this.xdsClient?.removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
}
this.clearClusterTree();
}
getTypeName(): string {
return TYPE_NAME;
Expand Down