Skip to content

Commit

Permalink
Merge pull request #2343 from murgatroid99/grpc-js-xds_aggregate_logi…
Browse files Browse the repository at this point in the history
…cal_dns_clusters

grpc-js-xds: Implement aggregate and logical DNS clusters
  • Loading branch information
murgatroid99 committed Feb 6, 2023
2 parents e6ea6f5 + 4e148cb commit dd7e1a9
Show file tree
Hide file tree
Showing 19 changed files with 1,378 additions and 645 deletions.
9 changes: 9 additions & 0 deletions packages/grpc-js-xds/src/generated/cluster.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions packages/grpc-js-xds/src/index.ts
Expand Up @@ -17,7 +17,8 @@

import * as resolver_xds from './resolver-xds';
import * as load_balancer_cds from './load-balancer-cds';
import * as load_balancer_eds from './load-balancer-eds';
import * as xds_cluster_resolver from './load-balancer-xds-cluster-resolver';
import * as xds_cluster_impl from './load-balancer-xds-cluster-impl';
import * as load_balancer_lrs from './load-balancer-lrs';
import * as load_balancer_priority from './load-balancer-priority';
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
Expand All @@ -32,7 +33,8 @@ import * as csds from './csds';
export function register() {
resolver_xds.setup();
load_balancer_cds.setup();
load_balancer_eds.setup();
xds_cluster_resolver.setup();
xds_cluster_impl.setup();
load_balancer_lrs.setup();
load_balancer_priority.setup();
load_balancer_weighted_target.setup();
Expand Down
231 changes: 171 additions & 60 deletions packages/grpc-js-xds/src/load-balancer-cds.ts
Expand Up @@ -28,11 +28,13 @@ import LoadBalancingConfig = experimental.LoadBalancingConfig;
import OutlierDetectionLoadBalancingConfig = experimental.OutlierDetectionLoadBalancingConfig;
import SuccessRateEjectionConfig = experimental.SuccessRateEjectionConfig;
import FailurePercentageEjectionConfig = experimental.FailurePercentageEjectionConfig;
import { EdsLoadBalancingConfig } from './load-balancer-eds';
import QueuePicker = experimental.QueuePicker;
import { Watcher } from './xds-stream-state/xds-stream-state';
import { OutlierDetection__Output } from './generated/envoy/config/cluster/v3/OutlierDetection';
import { Duration__Output } from './generated/google/protobuf/Duration';
import { EXPERIMENTAL_OUTLIER_DETECTION } from './environment';
import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler, XdsClusterResolverLoadBalancingConfig } from './load-balancer-xds-cluster-resolver';
import { CLUSTER_CONFIG_TYPE_URL, decodeSingleResource } from './resources';

const TRACER_NAME = 'cds_balancer';

Expand Down Expand Up @@ -115,57 +117,161 @@ function translateOutlierDetectionConfig(outlierDetection: OutlierDetection__Out
);
}

interface ClusterEntry {
watcher: Watcher<Cluster__Output>;
latestUpdate?: Cluster__Output;
children: string[];
}

interface ClusterTree {
[name: string]: ClusterEntry;
}

function isClusterTreeFullyUpdated(tree: ClusterTree, root: string): boolean {
const toCheck: string[] = [root];
const visited = new Set<string>();
while (toCheck.length > 0) {
const next = toCheck.shift()!;
if (visited.has(next)) {
continue;
}
visited.add(next);
if (!tree[next] || !tree[next].latestUpdate) {
return false;
}
toCheck.push(...tree[next].children);
}
return true;
}

function generateDiscoveryMechanismForCluster(config: Cluster__Output): DiscoveryMechanism {
let maxConcurrentRequests: number | undefined = undefined;
for (const threshold of config.circuit_breakers?.thresholds ?? []) {
if (threshold.priority === 'DEFAULT') {
maxConcurrentRequests = threshold.max_requests?.value;
}
}
if (config.type === 'EDS') {
// EDS cluster
return {
cluster: config.name,
lrs_load_reporting_server_name: config.lrs_server?.self ? '' : undefined,
max_concurrent_requests: maxConcurrentRequests,
type: 'EDS',
eds_service_name: config.eds_cluster_config!.service_name === '' ? undefined : config.eds_cluster_config!.service_name
};
} else {
// Logical DNS cluster
const socketAddress = config.load_assignment!.endpoints[0].lb_endpoints[0].endpoint!.address!.socket_address!;
return {
cluster: config.name,
lrs_load_reporting_server_name: config.lrs_server?.self ? '' : undefined,
max_concurrent_requests: maxConcurrentRequests,
type: 'LOGICAL_DNS',
dns_hostname: `${socketAddress.address}:${socketAddress.port_value}`
};
}
}

const RECURSION_DEPTH_LIMIT = 15;

/**
* Prerequisite: isClusterTreeFullyUpdated(tree, root)
* @param tree
* @param root
*/
function getDiscoveryMechanismList(tree: ClusterTree, root: string): DiscoveryMechanism[] {
const visited = new Set<string>();
function getDiscoveryMechanismListHelper(node: string, depth: number): DiscoveryMechanism[] {
if (depth > RECURSION_DEPTH_LIMIT) {
throw new Error('aggregate cluster graph exceeds max depth');
}
if (visited.has(node)) {
return [];
}
visited.add(node);
if (tree[node].children.length > 0) {
trace('Visit ' + node + ' children: [' + tree[node].children + ']');
// Aggregate cluster
const result = [];
for (const child of tree[node].children) {
result.push(...getDiscoveryMechanismListHelper(child, depth + 1));
}
return result;
} else {
trace('Visit leaf ' + node);
// individual cluster
const config = tree[node].latestUpdate!;
return [generateDiscoveryMechanismForCluster(config)];
}
}
return getDiscoveryMechanismListHelper(root, 0);
}

export class CdsLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private watcher: Watcher<Cluster__Output>;

private isWatcherActive = false;

private latestCdsUpdate: Cluster__Output | null = null;

private latestConfig: CdsLoadBalancingConfig | null = null;
private latestAttributes: { [key: string]: unknown } = {};
private xdsClient: XdsClient | null = null;

private clusterTree: ClusterTree = {};

private updatedChild = false;

constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
this.watcher = {
this.childBalancer = new XdsClusterResolverChildPolicyHandler(channelControlHelper);
}

private addCluster(cluster: string) {
if (cluster in this.clusterTree) {
return;
}
trace('Adding watcher for cluster ' + cluster);
const watcher: Watcher<Cluster__Output> = {
onValidUpdate: (update) => {
this.latestCdsUpdate = update;
let maxConcurrentRequests: number | undefined = undefined;
for (const threshold of update.circuit_breakers?.thresholds ?? []) {
if (threshold.priority === 'DEFAULT') {
maxConcurrentRequests = threshold.max_requests?.value;
this.clusterTree[cluster].latestUpdate = update;
if (update.cluster_discovery_type === 'cluster_type') {
const children = decodeSingleResource(CLUSTER_CONFIG_TYPE_URL, update.cluster_type!.typed_config!.value).clusters;
trace('Received update for aggregate cluster ' + cluster + ' with children [' + children + ']');
this.clusterTree[cluster].children = children;
children.forEach(child => this.addCluster(child));
}
if (isClusterTreeFullyUpdated(this.clusterTree, this.latestConfig!.getCluster())) {
let discoveryMechanismList: DiscoveryMechanism[];
try {
discoveryMechanismList = getDiscoveryMechanismList(this.clusterTree, this.latestConfig!.getCluster());
} catch (e) {
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: e.message, metadata: new Metadata()}));
return;
}
const clusterResolverConfig = new XdsClusterResolverLoadBalancingConfig(
discoveryMechanismList,
[],
[]
);
trace('Child update config: ' + JSON.stringify(clusterResolverConfig));
this.updatedChild = true;
this.childBalancer.updateAddressList(
[],
clusterResolverConfig,
this.latestAttributes
);
}
/* the lrs_server.self field indicates that the same server should be
* used for load reporting as for other xDS operations. Setting
* lrsLoadReportingServerName to the empty string sets that behavior.
* Otherwise, if the field is omitted, load reporting is disabled. */
const edsConfig: EdsLoadBalancingConfig = new EdsLoadBalancingConfig(
/* cluster= */ update.name,
/* localityPickingPolicy= */ [],
/* endpointPickingPolicy= */ [],
/* edsServiceName= */ update.eds_cluster_config!.service_name === '' ? undefined : update.eds_cluster_config!.service_name,
/* lrsLoadReportingServerName= */update.lrs_server?.self ? '' : undefined,
/* maxConcurrentRequests= */ maxConcurrentRequests,
/* outlierDetection= */ translateOutlierDetectionConfig(update.outlier_detection)
);
trace('Child update EDS config: ' + JSON.stringify(edsConfig));
this.childBalancer.updateAddressList(
[],
edsConfig,
this.latestAttributes
);
},
onResourceDoesNotExist: () => {
this.isWatcherActive = false;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: 'CDS resource does not exist', metadata: new Metadata()}));
if (cluster in this.clusterTree) {
this.clusterTree[cluster].latestUpdate = undefined;
this.clusterTree[cluster].children = [];
}
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `CDS resource ${cluster} does not exist`, metadata: new Metadata()}));
this.childBalancer.destroy();
},
onTransientError: (statusObj) => {
if (this.latestCdsUpdate === null) {
channelControlHelper.updateState(
if (!this.updatedChild) {
this.channelControlHelper.updateState(
connectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
code: status.UNAVAILABLE,
Expand All @@ -174,8 +280,27 @@ export class CdsLoadBalancer implements LoadBalancer {
})
);
}
},
}
};
this.clusterTree[cluster] = {
watcher: watcher,
children: []
};
this.xdsClient?.addClusterWatcher(cluster, watcher);
}

private removeCluster(cluster: string) {
if (!(cluster in this.clusterTree)) {
return;
}
this.xdsClient?.removeClusterWatcher(cluster, this.clusterTree[cluster].watcher);
delete this.clusterTree[cluster];
}

private clearClusterTree() {
for (const cluster of Object.keys(this.clusterTree)) {
this.removeCluster(cluster);
}
}

updateAddressList(
Expand All @@ -194,29 +319,20 @@ export class CdsLoadBalancer implements LoadBalancer {
/* If the cluster is changing, disable the old watcher before adding the new
* one */
if (
this.isWatcherActive &&
this.latestConfig?.getCluster() !== lbConfig.getCluster()
this.latestConfig && this.latestConfig.getCluster() !== lbConfig.getCluster()
) {
trace('Removing old cluster watcher for cluster name ' + this.latestConfig!.getCluster());
this.xdsClient.removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
/* Setting isWatcherActive to false here lets us have one code path for
* calling addClusterWatcher */
this.isWatcherActive = false;
/* If we have a new name, the latestCdsUpdate does not correspond to
* the new config, so it is no longer valid */
this.latestCdsUpdate = null;
trace('Removing old cluster watchers rooted at ' + this.latestConfig.getCluster());
this.clearClusterTree();
this.updatedChild = false;
}

if (!this.latestConfig) {
this.channelControlHelper.updateState(connectivityState.CONNECTING, new QueuePicker(this));
}

this.latestConfig = lbConfig;

if (!this.isWatcherActive) {
trace('Adding new cluster watcher for cluster name ' + lbConfig.getCluster());
this.xdsClient.addClusterWatcher(lbConfig.getCluster(), this.watcher);
this.isWatcherActive = true;
}
this.addCluster(lbConfig.getCluster());
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand All @@ -225,14 +341,9 @@ export class CdsLoadBalancer implements LoadBalancer {
this.childBalancer.resetBackoff();
}
destroy(): void {
trace('Destroying load balancer with cluster name ' + this.latestConfig?.getCluster());
trace('Destroying load balancer rooted at cluster named ' + this.latestConfig?.getCluster());
this.childBalancer.destroy();
if (this.isWatcherActive) {
this.xdsClient?.removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
}
this.clearClusterTree();
}
getTypeName(): string {
return TYPE_NAME;
Expand Down

0 comments on commit dd7e1a9

Please sign in to comment.