Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: fix a concurrency issue in CSDS ClientStatus responses #8795

Merged
merged 7 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 28 additions & 7 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -158,7 +161,6 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
static boolean enableRouteLookup =
!Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_RLS_LB"))
&& Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_RLS_LB"));

private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 =
"type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2"
+ ".HttpConnectionManager";
Expand Down Expand Up @@ -2028,12 +2030,31 @@ public Collection<String> getSubscribedResources(ServerInfo serverInfo, Resource
}

@Override
Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
Map<String, ResourceMetadata> metadataMap = new HashMap<>();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
metadataMap.put(entry.getKey(), entry.getValue().metadata);
}
return metadataMap;
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot() {
final SettableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> future =
SettableFuture.create();
syncContext.execute(new Runnable() {
@Override
public void run() {
// A map from a "resource type" to a map ("resource name": "resource metadata")
ImmutableMap.Builder<ResourceType, Map<String, ResourceMetadata>> metadataSnapshot =
ImmutableMap.builder();
for (ResourceType type : ResourceType.values()) {
if (type == ResourceType.UNKNOWN) {
continue;
}
ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
for (Map.Entry<String, ResourceSubscriber> resourceEntry
: getSubscribedResourcesMap(type).entrySet()) {
metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
}
metadataSnapshot.put(type, metadataMap.build());
}
future.set(metadataSnapshot.build());
}
});
return future;
}

@Override
Expand Down
53 changes: 41 additions & 12 deletions xds/src/main/java/io/grpc/xds/CsdsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Verify.verifyNotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.util.Timestamps;
import io.envoyproxy.envoy.admin.v3.ClientResourceStatus;
import io.envoyproxy.envoy.service.status.v3.ClientConfig;
Expand All @@ -37,6 +39,9 @@
import io.grpc.xds.XdsClient.ResourceMetadata.UpdateFailureState;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -101,21 +106,28 @@ public void onCompleted() {

private boolean handleRequest(
ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
StatusException error;
try {
responseObserver.onNext(getConfigDumpForRequest(request));
dapengzhang0 marked this conversation as resolved.
Show resolved Hide resolved
return true;
sergiitk marked this conversation as resolved.
Show resolved Hide resolved
} catch (StatusException e) {
responseObserver.onError(e);
error = e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
} catch (Exception e) {
logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
responseObserver.onError(new StatusException(
Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)));
error =
Status.INTERNAL.withDescription("Unexpected internal error").withCause(e).asException();
}

responseObserver.onError(error);
return false;
}

private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request)
throws StatusException {
throws StatusException, InterruptedException {
if (request.getNodeMatchersCount() > 0) {
throw new StatusException(
Status.INVALID_ARGUMENT.withDescription("node_matchers not supported"));
Expand All @@ -140,16 +152,20 @@ private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request
}

@VisibleForTesting
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) {
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) throws InterruptedException {
ClientConfig.Builder builder = ClientConfig.newBuilder()
.setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode());
for (ResourceType type : ResourceType.values()) {
if (type == ResourceType.UNKNOWN) {
continue;
}
Map<String, ResourceMetadata> metadataMap = xdsClient.getSubscribedResourcesMetadata(type);
for (String resourceName : metadataMap.keySet()) {
ResourceMetadata metadata = metadataMap.get(resourceName);

Map<ResourceType, Map<String, ResourceMetadata>> metadataByType =
awaitSubscribedResourcesMetadata(xdsClient.getSubscribedResourcesMetadataSnapshot());

for (Map.Entry<ResourceType, Map<String, ResourceMetadata>> metadataByTypeEntry
: metadataByType.entrySet()) {
ResourceType type = metadataByTypeEntry.getKey();
Map<String, ResourceMetadata> metadataByResourceName = metadataByTypeEntry.getValue();
for (Map.Entry<String, ResourceMetadata> metadataEntry : metadataByResourceName.entrySet()) {
String resourceName = metadataEntry.getKey();
ResourceMetadata metadata = metadataEntry.getValue();
GenericXdsConfig.Builder genericXdsConfigBuilder = GenericXdsConfig.newBuilder()
.setTypeUrl(type.typeUrl())
.setName(resourceName)
Expand All @@ -161,6 +177,7 @@ static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) {
.setXdsConfig(metadata.getRawResource());
}
if (metadata.getStatus() == ResourceMetadataStatus.NACKED) {
verifyNotNull(metadata.getErrorState(), "resource %s getErrorState", resourceName);
genericXdsConfigBuilder
.setErrorState(metadataUpdateFailureStateToProto(metadata.getErrorState()));
}
Expand All @@ -170,6 +187,18 @@ static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) {
return builder.build();
}

private static Map<ResourceType, Map<String, ResourceMetadata>> awaitSubscribedResourcesMetadata(
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> future)
throws InterruptedException {
try {
// Normally this shouldn't take long, but add some slack for cases like a cold JVM.
return future.get(20, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// For CSDS' purposes, the exact reason why metadata not loaded isn't important.
throw new RuntimeException(e);
}
}

@VisibleForTesting
static ClientResourceStatus metadataStatusToClientStatus(ResourceMetadataStatus status) {
switch (status) {
Expand Down
12 changes: 11 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import io.grpc.Status;
import io.grpc.xds.AbstractXdsClient.ResourceType;
Expand Down Expand Up @@ -494,7 +495,16 @@ TlsContextManager getTlsContextManager() {
throw new UnsupportedOperationException();
}

Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
/**
* Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as
* they are at the moment of the call.
*
* <p>The snapshot is a map from the "resource type" to
* a map ("resource name": "resource metadata").
*/
// Must be synchronized.
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot() {
throw new UnsupportedOperationException();
}

Expand Down
41 changes: 26 additions & 15 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,23 @@ protected static boolean matchErrorDetail(

private void verifySubscribedResourcesMetadataSizes(
int ldsSize, int cdsSize, int rdsSize, int edsSize) {
assertThat(xdsClient.getSubscribedResourcesMetadata(LDS)).hasSize(ldsSize);
assertThat(xdsClient.getSubscribedResourcesMetadata(CDS)).hasSize(cdsSize);
assertThat(xdsClient.getSubscribedResourcesMetadata(RDS)).hasSize(rdsSize);
assertThat(xdsClient.getSubscribedResourcesMetadata(EDS)).hasSize(edsSize);
Map<ResourceType, Map<String, ResourceMetadata>> subscribedResourcesMetadata =
awaitSubscribedResourcesMetadata();
assertThat(subscribedResourcesMetadata.get(LDS)).hasSize(ldsSize);
assertThat(subscribedResourcesMetadata.get(CDS)).hasSize(cdsSize);
assertThat(subscribedResourcesMetadata.get(RDS)).hasSize(rdsSize);
assertThat(subscribedResourcesMetadata.get(EDS)).hasSize(edsSize);
}

private Map<ResourceType, Map<String, ResourceMetadata>> awaitSubscribedResourcesMetadata() {
try {
return xdsClient.getSubscribedResourcesMetadataSnapshot().get(20, TimeUnit.SECONDS);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new AssertionError(e);
}
}

/** Verify the resource requested, but not updated. */
Expand Down Expand Up @@ -434,22 +447,20 @@ private void verifyResourceMetadataNacked(
private ResourceMetadata verifyResourceMetadata(
ResourceType type, String resourceName, Any rawResource, ResourceMetadataStatus status,
String versionInfo, long updateTimeNanos, boolean hasErrorState) {
ResourceMetadata resourceMetadata =
xdsClient.getSubscribedResourcesMetadata(type).get(resourceName);
assertThat(resourceMetadata).isNotNull();
ResourceMetadata metadata = awaitSubscribedResourcesMetadata().get(type).get(resourceName);
assertThat(metadata).isNotNull();
String name = type.toString() + " resource '" + resourceName + "' metadata field ";
assertWithMessage(name + "status").that(resourceMetadata.getStatus()).isEqualTo(status);
assertWithMessage(name + "version").that(resourceMetadata.getVersion()).isEqualTo(versionInfo);
assertWithMessage(name + "rawResource").that(resourceMetadata.getRawResource())
.isEqualTo(rawResource);
assertWithMessage(name + "updateTimeNanos").that(resourceMetadata.getUpdateTimeNanos())
assertWithMessage(name + "status").that(metadata.getStatus()).isEqualTo(status);
assertWithMessage(name + "version").that(metadata.getVersion()).isEqualTo(versionInfo);
assertWithMessage(name + "rawResource").that(metadata.getRawResource()).isEqualTo(rawResource);
assertWithMessage(name + "updateTimeNanos").that(metadata.getUpdateTimeNanos())
.isEqualTo(updateTimeNanos);
if (hasErrorState) {
assertWithMessage(name + "errorState").that(resourceMetadata.getErrorState()).isNotNull();
assertWithMessage(name + "errorState").that(metadata.getErrorState()).isNotNull();
} else {
assertWithMessage(name + "errorState").that(resourceMetadata.getErrorState()).isNull();
assertWithMessage(name + "errorState").that(metadata.getErrorState()).isNull();
}
return resourceMetadata;
return metadata;
}

/**
Expand Down