Skip to content

Commit

Permalink
xds: fix a concurrency issue in CSDS ClientStatus responses (#8795)
Browse files Browse the repository at this point in the history
* xds: fix a concurrency issue in CSDS ClientStatus responses

Fixes an issue with ClientXdsClient.getSubscribedResourcesMetadata()
executed out of shared synchronization context, and leading to:

- each individual config dump containing outdated data when
  an xDS resource is updated during CsdsService preparing the response
- config dumps for different services being out-of-sync with each
  other when any of the related xDS resources is updated during
  CsdsService preparing the response

The fix replaces getSubscribedResourcesMetadata(ResourceType type)
with atomic getSubscribedResourcesMetadataSnapshot() returning
a snapshot of all resources for each type as they are
at the moment of a CSDS request.
  • Loading branch information
sergiitk committed Jan 12, 2022
1 parent 69671e1 commit 7c4fe69
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 83 deletions.
35 changes: 28 additions & 7 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -158,7 +161,6 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
static boolean enableRouteLookup =
!Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_RLS_LB"))
&& Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_RLS_LB"));

private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 =
"type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2"
+ ".HttpConnectionManager";
Expand Down Expand Up @@ -2028,12 +2030,31 @@ public Collection<String> getSubscribedResources(ServerInfo serverInfo, Resource
}

@Override
Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
Map<String, ResourceMetadata> metadataMap = new HashMap<>();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
metadataMap.put(entry.getKey(), entry.getValue().metadata);
}
return metadataMap;
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot() {
final SettableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> future =
SettableFuture.create();
syncContext.execute(new Runnable() {
@Override
public void run() {
// A map from a "resource type" to a map ("resource name": "resource metadata")
ImmutableMap.Builder<ResourceType, Map<String, ResourceMetadata>> metadataSnapshot =
ImmutableMap.builder();
for (ResourceType type : ResourceType.values()) {
if (type == ResourceType.UNKNOWN) {
continue;
}
ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
for (Map.Entry<String, ResourceSubscriber> resourceEntry
: getSubscribedResourcesMap(type).entrySet()) {
metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
}
metadataSnapshot.put(type, metadataMap.build());
}
future.set(metadataSnapshot.build());
}
});
return future;
}

@Override
Expand Down
53 changes: 41 additions & 12 deletions xds/src/main/java/io/grpc/xds/CsdsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Verify.verifyNotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.util.Timestamps;
import io.envoyproxy.envoy.admin.v3.ClientResourceStatus;
import io.envoyproxy.envoy.service.status.v3.ClientConfig;
Expand All @@ -37,6 +39,9 @@
import io.grpc.xds.XdsClient.ResourceMetadata.UpdateFailureState;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -101,21 +106,28 @@ public void onCompleted() {

private boolean handleRequest(
ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
StatusException error;
try {
responseObserver.onNext(getConfigDumpForRequest(request));
return true;
} catch (StatusException e) {
responseObserver.onError(e);
error = e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
} catch (Exception e) {
logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
responseObserver.onError(new StatusException(
Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)));
error =
Status.INTERNAL.withDescription("Unexpected internal error").withCause(e).asException();
}

responseObserver.onError(error);
return false;
}

private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request)
throws StatusException {
throws StatusException, InterruptedException {
if (request.getNodeMatchersCount() > 0) {
throw new StatusException(
Status.INVALID_ARGUMENT.withDescription("node_matchers not supported"));
Expand All @@ -140,16 +152,20 @@ private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request
}

@VisibleForTesting
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) {
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) throws InterruptedException {
ClientConfig.Builder builder = ClientConfig.newBuilder()
.setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode());
for (ResourceType type : ResourceType.values()) {
if (type == ResourceType.UNKNOWN) {
continue;
}
Map<String, ResourceMetadata> metadataMap = xdsClient.getSubscribedResourcesMetadata(type);
for (String resourceName : metadataMap.keySet()) {
ResourceMetadata metadata = metadataMap.get(resourceName);

Map<ResourceType, Map<String, ResourceMetadata>> metadataByType =
awaitSubscribedResourcesMetadata(xdsClient.getSubscribedResourcesMetadataSnapshot());

for (Map.Entry<ResourceType, Map<String, ResourceMetadata>> metadataByTypeEntry
: metadataByType.entrySet()) {
ResourceType type = metadataByTypeEntry.getKey();
Map<String, ResourceMetadata> metadataByResourceName = metadataByTypeEntry.getValue();
for (Map.Entry<String, ResourceMetadata> metadataEntry : metadataByResourceName.entrySet()) {
String resourceName = metadataEntry.getKey();
ResourceMetadata metadata = metadataEntry.getValue();
GenericXdsConfig.Builder genericXdsConfigBuilder = GenericXdsConfig.newBuilder()
.setTypeUrl(type.typeUrl())
.setName(resourceName)
Expand All @@ -161,6 +177,7 @@ static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) {
.setXdsConfig(metadata.getRawResource());
}
if (metadata.getStatus() == ResourceMetadataStatus.NACKED) {
verifyNotNull(metadata.getErrorState(), "resource %s getErrorState", resourceName);
genericXdsConfigBuilder
.setErrorState(metadataUpdateFailureStateToProto(metadata.getErrorState()));
}
Expand All @@ -170,6 +187,18 @@ static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) {
return builder.build();
}

private static Map<ResourceType, Map<String, ResourceMetadata>> awaitSubscribedResourcesMetadata(
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> future)
throws InterruptedException {
try {
// Normally this shouldn't take long, but add some slack for cases like a cold JVM.
return future.get(20, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// For CSDS' purposes, the exact reason why metadata not loaded isn't important.
throw new RuntimeException(e);
}
}

@VisibleForTesting
static ClientResourceStatus metadataStatusToClientStatus(ResourceMetadataStatus status) {
switch (status) {
Expand Down
12 changes: 11 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import io.grpc.Status;
import io.grpc.xds.AbstractXdsClient.ResourceType;
Expand Down Expand Up @@ -494,7 +495,16 @@ TlsContextManager getTlsContextManager() {
throw new UnsupportedOperationException();
}

Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
/**
* Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as
* they are at the moment of the call.
*
* <p>The snapshot is a map from the "resource type" to
* a map ("resource name": "resource metadata").
*/
// Must be synchronized.
ListenableFuture<Map<ResourceType, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot() {
throw new UnsupportedOperationException();
}

Expand Down
41 changes: 26 additions & 15 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,23 @@ protected static boolean matchErrorDetail(

private void verifySubscribedResourcesMetadataSizes(
int ldsSize, int cdsSize, int rdsSize, int edsSize) {
assertThat(xdsClient.getSubscribedResourcesMetadata(LDS)).hasSize(ldsSize);
assertThat(xdsClient.getSubscribedResourcesMetadata(CDS)).hasSize(cdsSize);
assertThat(xdsClient.getSubscribedResourcesMetadata(RDS)).hasSize(rdsSize);
assertThat(xdsClient.getSubscribedResourcesMetadata(EDS)).hasSize(edsSize);
Map<ResourceType, Map<String, ResourceMetadata>> subscribedResourcesMetadata =
awaitSubscribedResourcesMetadata();
assertThat(subscribedResourcesMetadata.get(LDS)).hasSize(ldsSize);
assertThat(subscribedResourcesMetadata.get(CDS)).hasSize(cdsSize);
assertThat(subscribedResourcesMetadata.get(RDS)).hasSize(rdsSize);
assertThat(subscribedResourcesMetadata.get(EDS)).hasSize(edsSize);
}

private Map<ResourceType, Map<String, ResourceMetadata>> awaitSubscribedResourcesMetadata() {
try {
return xdsClient.getSubscribedResourcesMetadataSnapshot().get(20, TimeUnit.SECONDS);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new AssertionError(e);
}
}

/** Verify the resource requested, but not updated. */
Expand Down Expand Up @@ -434,22 +447,20 @@ private void verifyResourceMetadataNacked(
private ResourceMetadata verifyResourceMetadata(
ResourceType type, String resourceName, Any rawResource, ResourceMetadataStatus status,
String versionInfo, long updateTimeNanos, boolean hasErrorState) {
ResourceMetadata resourceMetadata =
xdsClient.getSubscribedResourcesMetadata(type).get(resourceName);
assertThat(resourceMetadata).isNotNull();
ResourceMetadata metadata = awaitSubscribedResourcesMetadata().get(type).get(resourceName);
assertThat(metadata).isNotNull();
String name = type.toString() + " resource '" + resourceName + "' metadata field ";
assertWithMessage(name + "status").that(resourceMetadata.getStatus()).isEqualTo(status);
assertWithMessage(name + "version").that(resourceMetadata.getVersion()).isEqualTo(versionInfo);
assertWithMessage(name + "rawResource").that(resourceMetadata.getRawResource())
.isEqualTo(rawResource);
assertWithMessage(name + "updateTimeNanos").that(resourceMetadata.getUpdateTimeNanos())
assertWithMessage(name + "status").that(metadata.getStatus()).isEqualTo(status);
assertWithMessage(name + "version").that(metadata.getVersion()).isEqualTo(versionInfo);
assertWithMessage(name + "rawResource").that(metadata.getRawResource()).isEqualTo(rawResource);
assertWithMessage(name + "updateTimeNanos").that(metadata.getUpdateTimeNanos())
.isEqualTo(updateTimeNanos);
if (hasErrorState) {
assertWithMessage(name + "errorState").that(resourceMetadata.getErrorState()).isNotNull();
assertWithMessage(name + "errorState").that(metadata.getErrorState()).isNotNull();
} else {
assertWithMessage(name + "errorState").that(resourceMetadata.getErrorState()).isNull();
assertWithMessage(name + "errorState").that(metadata.getErrorState()).isNull();
}
return resourceMetadata;
return metadata;
}

/**
Expand Down

0 comments on commit 7c4fe69

Please sign in to comment.