Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have EDS resource parse the additional addresses from envoy message #11011

Merged
merged 8 commits into from
Mar 15, 2024
23 changes: 17 additions & 6 deletions repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ IO_GRPC_GRPC_JAVA_OVERRIDE_TARGETS = {

def grpc_java_repositories():
"""Imports dependencies for grpc-java."""
if not native.existing_rule("com_github_cncf_udpa"):
http_archive(
name = "com_github_cncf_udpa",
sha256 = "0d33b83f8c6368954e72e7785539f0d272a8aba2f6e2e336ed15fd1514bc9899",
strip_prefix = "xds-e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7",
urls = [
"https://storage.googleapis.com/grpc-bazel-mirror/github.com/cncf/xds/archive/e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7.tar.gz",
"https://github.com/cncf/xds/archive/e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7.tar.gz",
],
)
if not native.existing_rule("com_github_cncf_xds"):
http_archive(
name = "com_github_cncf_xds",
Expand Down Expand Up @@ -130,10 +140,11 @@ def grpc_java_repositories():
if not native.existing_rule("envoy_api"):
http_archive(
name = "envoy_api",
sha256 = "b426904abf51ba21dd8947a05694bb3c861d6f5e436e4673e74d7d7bfb6d3188",
strip_prefix = "data-plane-api-268824e4eee3d7770a347a5dc5aaddc0b1b14e24",
sha256 = "fff067a5d6d776fc88549b5dd4773a6f8f0187b26a859de8b29bd4226a28ee63",
strip_prefix = "data-plane-api-9d6ffa70677c4dbf23f6ed569676206c4e2edff4",
urls = [
"https://github.com/envoyproxy/data-plane-api/archive/268824e4eee3d7770a347a5dc5aaddc0b1b14e24.tar.gz",
"https://storage.googleapis.com/grpc-bazel-mirror/github.com/envoyproxy/data-plane-api/archive/9d6ffa70677c4dbf23f6ed569676206c4e2edff4.tar.gz",
"https://github.com/envoyproxy/data-plane-api/archive/9d6ffa70677c4dbf23f6ed569676206c4e2edff4.tar.gz",
],
)

Expand All @@ -160,7 +171,7 @@ def com_google_protobuf_javalite():
def io_grpc_grpc_proto():
http_archive(
name = "io_grpc_grpc_proto",
sha256 = "464e97a24d7d784d9c94c25fa537ba24127af5aae3edd381007b5b98705a0518",
strip_prefix = "grpc-proto-08911e9d585cbda3a55eb1dcc4b99c89aebccff8",
urls = ["https://github.com/grpc/grpc-proto/archive/08911e9d585cbda3a55eb1dcc4b99c89aebccff8.zip"],
sha256 = "729ac127a003836d539ed9da72a21e094aac4c4609e0481d6fc9e28a844e11af",
strip_prefix = "grpc-proto-4f245d272a28a680606c0739753506880cf33b5f",
urls = ["https://github.com/grpc/grpc-proto/archive/4f245d272a28a680606c0739753506880cf33b5f.zip"],
)
37 changes: 27 additions & 10 deletions xds/src/main/java/io/grpc/xds/XdsEndpointResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Message;
import io.envoyproxy.envoy.config.core.v3.Address;
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.grpc.EquivalentAddressGroup;
import io.grpc.internal.GrpcUtil;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
Expand All @@ -46,6 +49,8 @@
class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
static final String ADS_TYPE_URL_EDS =
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS =
"grpc.experimental.xdsDualstackEndpoints";

private static final XdsEndpointResource instance = new XdsEndpointResource();

Expand Down Expand Up @@ -95,6 +100,10 @@ protected EdsUpdate doParse(Args args, Message unpackedMessage) throws ResourceI
return processClusterLoadAssignment((ClusterLoadAssignment) unpackedMessage);
}

private static boolean isEnabledXdsDualStack() {
return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, false);
}

private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)
throws ResourceInvalidException {
Map<Integer, Set<Locality>> priorities = new HashMap<>();
Expand Down Expand Up @@ -190,22 +199,30 @@ static StructOrError<LocalityLbEndpoints> parseLocalityLbEndpoints(
if (!endpoint.hasEndpoint() || !endpoint.getEndpoint().hasAddress()) {
return StructOrError.fromError("LbEndpoint with no endpoint/address");
}
io.envoyproxy.envoy.config.core.v3.SocketAddress socketAddress =
endpoint.getEndpoint().getAddress().getSocketAddress();
InetSocketAddress addr =
new InetSocketAddress(socketAddress.getAddress(), socketAddress.getPortValue());
boolean isHealthy =
endpoint.getHealthStatus() == io.envoyproxy.envoy.config.core.v3.HealthStatus.HEALTHY
|| endpoint.getHealthStatus()
== io.envoyproxy.envoy.config.core.v3.HealthStatus.UNKNOWN;
List<java.net.SocketAddress> addresses = new ArrayList<>();
addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress()));
if (isEnabledXdsDualStack()) {
for (Endpoint.AdditionalAddress additionalAddress
: endpoint.getEndpoint().getAdditionalAddressesList()) {
addresses.add(getInetSocketAddress(additionalAddress.getAddress()));
}
}
boolean isHealthy = (endpoint.getHealthStatus() == HealthStatus.HEALTHY)
|| (endpoint.getHealthStatus() == HealthStatus.UNKNOWN);
endpoints.add(Endpoints.LbEndpoint.create(
new EquivalentAddressGroup(ImmutableList.<java.net.SocketAddress>of(addr)),
new EquivalentAddressGroup(addresses),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableList.copyOf()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addresses is already a new list (created on line 196), so we don't care what is done with it. On top of that the EquivalentAddressGroup constructor copies whatever we give it.
this.addrs = Collections.unmodifiableList(new ArrayList<>(addrs));

endpoint.getLoadBalancingWeight().getValue(), isHealthy));
}
return StructOrError.fromStruct(Endpoints.LocalityLbEndpoints.create(
endpoints, proto.getLoadBalancingWeight().getValue(), proto.getPriority()));
}

private static InetSocketAddress getInetSocketAddress(Address address) {
io.envoyproxy.envoy.config.core.v3.SocketAddress socketAddress = address.getSocketAddress();

return new InetSocketAddress(socketAddress.getAddress(), socketAddress.getPortValue());
}

static final class EdsUpdate implements ResourceUpdate {
final String clusterName;
final Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap;
Expand Down
51 changes: 51 additions & 0 deletions xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.truth.Truth.assertThat;
import static io.envoyproxy.envoy.config.route.v3.RouteAction.ClusterSpecifierCase.CLUSTER_SPECIFIER_PLUGIN;
import static io.grpc.xds.XdsEndpointResource.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS;
import static org.junit.Assert.fail;

import com.github.udpa.udpa.type.v1.TypedStruct;
Expand Down Expand Up @@ -108,6 +109,7 @@
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
import io.envoyproxy.envoy.type.v3.Int64Range;
import io.grpc.ClientInterceptor;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InsecureChannelCredentials;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerRegistry;
Expand Down Expand Up @@ -142,6 +144,7 @@
import io.grpc.xds.internal.Matchers;
import io.grpc.xds.internal.Matchers.FractionMatcher;
import io.grpc.xds.internal.Matchers.HeaderMatcher;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -976,6 +979,54 @@ public void parseLocalityLbEndpoints_ignorZeroWeightLocality() {
assertThat(XdsEndpointResource.parseLocalityLbEndpoints(proto)).isNull();
}

@Test
public void parseLocalityLbEndpoints_withDualStackEndpoints() {
String originalDualStackProp =
System.setProperty(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, "true");
String v4Address = "172.14.14.5";
String v6Address = "2001:db8::1";
int port = 8888;

try {
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints.newBuilder()
.setLocality(Locality.newBuilder()
.setRegion("region-foo").setZone("zone-foo").setSubZone("subZone-foo"))
.setLoadBalancingWeight(UInt32Value.newBuilder().setValue(100)) // locality weight
.setPriority(1)
.addLbEndpoints(io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(
SocketAddress.newBuilder()
.setAddress(v4Address).setPortValue(port)))
.addAdditionalAddresses(Endpoint.AdditionalAddress.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(
SocketAddress.newBuilder()
.setAddress(v6Address).setPortValue(port)))))
.setHealthStatus(io.envoyproxy.envoy.config.core.v3.HealthStatus.HEALTHY)
.setLoadBalancingWeight(UInt32Value.newBuilder().setValue(20)))
.build();

StructOrError<LocalityLbEndpoints> struct =
XdsEndpointResource.parseLocalityLbEndpoints(proto);
assertThat(struct.getErrorDetail()).isNull();
List<java.net.SocketAddress> socketAddressList = Arrays.asList(
new InetSocketAddress(v4Address, port), new InetSocketAddress(v6Address, port));
EquivalentAddressGroup expectedEag = new EquivalentAddressGroup(socketAddressList);
assertThat(struct.getStruct()).isEqualTo(
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(expectedEag, 20, true)), 100, 1));
} finally {
if (originalDualStackProp != null) {
System.setProperty(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, originalDualStackProp);
} else {
System.clearProperty(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS);
}
}
}

@Test
public void parseLocalityLbEndpoints_invalidPriority() {
io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto =
Expand Down