From 602624887f92dd81ea0f1faf7c949d34255b9bc7 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Fri, 29 Oct 2021 10:12:38 -0700 Subject: [PATCH 1/7] rls: sync latest rls protos from grpc-proto (#8638) --- rls/src/main/proto/grpc/lookup/v1/rls.proto | 13 +++++-------- rls/src/main/proto/grpc/lookup/v1/rls_config.proto | 7 +++++++ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/rls/src/main/proto/grpc/lookup/v1/rls.proto b/rls/src/main/proto/grpc/lookup/v1/rls.proto index d9dd6c246f24..7d1735289d56 100644 --- a/rls/src/main/proto/grpc/lookup/v1/rls.proto +++ b/rls/src/main/proto/grpc/lookup/v1/rls.proto @@ -22,14 +22,6 @@ option java_package = "io.grpc.lookup.v1"; option java_outer_classname = "RlsProto"; message RouteLookupRequest { - // Full host name of the target server, e.g. firestore.googleapis.com. - // Only set for gRPC requests; HTTP requests must use key_map explicitly. - // Deprecated in favor of setting key_map keys with GrpcKeyBuilder.extra_keys. - string server = 1 [deprecated = true]; - // Full path of the request, i.e. "/service/method". - // Only set for gRPC requests; HTTP requests must use key_map explicitly. - // Deprecated in favor of setting key_map keys with GrpcKeyBuilder.extra_keys. - string path = 2 [deprecated = true]; // Target type allows the client to specify what kind of target format it // would like from RLS to allow it to find the regional server, e.g. "grpc". string target_type = 3; @@ -41,8 +33,13 @@ message RouteLookupRequest { } // Reason for making this request. Reason reason = 5; + // For REASON_STALE, the header_data from the stale response, if any. + string stale_header_data = 6; // Map of key values extracted via key builders for the gRPC or HTTP request. map key_map = 4; + + reserved 1, 2; + reserved "server", "path"; } message RouteLookupResponse { diff --git a/rls/src/main/proto/grpc/lookup/v1/rls_config.proto b/rls/src/main/proto/grpc/lookup/v1/rls_config.proto index db99a8949ea9..9d2b6c54cfb4 100644 --- a/rls/src/main/proto/grpc/lookup/v1/rls_config.proto +++ b/rls/src/main/proto/grpc/lookup/v1/rls_config.proto @@ -216,3 +216,10 @@ message RouteLookupConfig { reserved 10; reserved "request_processing_strategy"; } + +// RouteLookupClusterSpecifier is used in xDS to represent a cluster specifier +// plugin for RLS. +message RouteLookupClusterSpecifier { + // The RLS config for this cluster specifier plugin instance. + RouteLookupConfig route_lookup_config = 1; +} From 59c6b49fd4051b8d9d5cf958a762007a2f60280c Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Fri, 29 Oct 2021 11:46:00 -0700 Subject: [PATCH 2/7] xds: lazily init MessagePrinter (#8639) Just for cleanup. The printer might be used in other class e.g. to convert RLS proto to string/Map. --- .../java/io/grpc/xds/AbstractXdsClient.java | 19 +++-- .../main/java/io/grpc/xds/MessagePrinter.java | 70 ++++++++++--------- .../java/io/grpc/xds/MessagePrinterTest.java | 9 ++- 3 files changed, 54 insertions(+), 44 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java index a6c3c2feb99a..0e609ff74587 100644 --- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java @@ -78,7 +78,6 @@ public void uncaughtException(Thread t, Throwable e) { throw new AssertionError(e); } }); - private final MessagePrinter msgPrinter = new MessagePrinter(); private final InternalLogId logId; private final XdsLogger logger; private final ManagedChannel channel; @@ -580,8 +579,9 @@ public void onNext(final io.envoyproxy.envoy.api.v2.DiscoveryResponse response) public void run() { ResourceType type = ResourceType.fromTypeUrl(response.getTypeUrl()); if (logger.isLoggable(XdsLogLevel.DEBUG)) { - logger.log(XdsLogLevel.DEBUG, "Received {0} response:\n{1}", - type, msgPrinter.print(response)); + logger.log( + XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type, + MessagePrinter.print(response)); } handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(), response.getNonce()); @@ -633,7 +633,9 @@ void sendDiscoveryRequest(ResourceType type, String versionInfo, Collection Date: Mon, 1 Nov 2021 14:01:56 +0100 Subject: [PATCH 3/7] Support BinderChannelBuilder.forTarget. (#8633) Allows this class to be used with custom name resolvers. --- .../grpc/binder/BinderChannelSmokeTest.java | 14 ++- .../io/grpc/binder/BinderChannelBuilder.java | 56 ++++++++--- .../testing/FakeNameResolverProvider.java | 94 +++++++++++++++++++ 3 files changed, 151 insertions(+), 13 deletions(-) create mode 100644 testing/src/main/java/io/grpc/internal/testing/FakeNameResolverProvider.java diff --git a/binder/src/androidTest/java/io/grpc/binder/BinderChannelSmokeTest.java b/binder/src/androidTest/java/io/grpc/binder/BinderChannelSmokeTest.java index dd9cf26bd3d0..41ea76146de2 100644 --- a/binder/src/androidTest/java/io/grpc/binder/BinderChannelSmokeTest.java +++ b/binder/src/androidTest/java/io/grpc/binder/BinderChannelSmokeTest.java @@ -36,11 +36,13 @@ import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.NameResolverRegistry; import io.grpc.Server; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptors; import io.grpc.ServerServiceDefinition; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.testing.FakeNameResolverProvider; import io.grpc.stub.ClientCalls; import io.grpc.stub.ServerCalls; import io.grpc.stub.StreamObserver; @@ -66,6 +68,7 @@ public final class BinderChannelSmokeTest { private static final int SLIGHTLY_MORE_THAN_ONE_BLOCK = 16 * 1024 + 100; private static final String MSG = "Some text which will be repeated many many times"; + private static final String SERVER_TARGET_URI = "fake://server"; final MethodDescriptor method = MethodDescriptor.newBuilder(StringMarshaller.INSTANCE, StringMarshaller.INSTANCE) @@ -85,7 +88,7 @@ public final class BinderChannelSmokeTest { .setType(MethodDescriptor.MethodType.BIDI_STREAMING) .build(); - AndroidComponentAddress serverAddress; + FakeNameResolverProvider fakeNameResolverProvider; ManagedChannel channel; AtomicReference headersCapture = new AtomicReference<>(); @@ -118,6 +121,8 @@ public void setUp() throws Exception { TestUtils.recordRequestHeadersInterceptor(headersCapture)); AndroidComponentAddress serverAddress = HostServices.allocateService(appContext); + fakeNameResolverProvider = new FakeNameResolverProvider(SERVER_TARGET_URI, serverAddress); + NameResolverRegistry.getDefaultRegistry().register(fakeNameResolverProvider); HostServices.configureService(serverAddress, HostServices.serviceParamsBuilder() .setServerFactory((service, receiver) -> @@ -132,6 +137,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { channel.shutdownNow(); + NameResolverRegistry.getDefaultRegistry().deregister(fakeNameResolverProvider); HostServices.awaitServiceShutdown(); } @@ -192,6 +198,12 @@ public void testStreamingCallOptionHeaders() throws Exception { assertThat(headersCapture.get().get(GrpcUtil.TIMEOUT_KEY)).isGreaterThan(0); } + @Test + public void testConnectViaTargetUri() throws Exception { + channel = BinderChannelBuilder.forTarget(SERVER_TARGET_URI, appContext).build(); + assertThat(doCall("Hello").get()).isEqualTo("Hello"); + } + private static String createLargeString(int size) { StringBuilder sb = new StringBuilder(); while (sb.length() < size) { diff --git a/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java b/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java index 99191cfad3c6..91e4e8f1c76a 100644 --- a/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java +++ b/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java @@ -67,13 +67,35 @@ public final class BinderChannelBuilder *

You the caller are responsible for managing the lifecycle of any channels built by the * resulting builder. They will not be shut down automatically. * - * @param targetAddress the {@link AndroidComponentAddress} referencing the service to bind to. + * @param directAddress the {@link AndroidComponentAddress} referencing the service to bind to. * @param sourceContext the context to bind from (e.g. The current Activity or Application). * @return a new builder */ public static BinderChannelBuilder forAddress( - AndroidComponentAddress targetAddress, Context sourceContext) { - return new BinderChannelBuilder(targetAddress, sourceContext); + AndroidComponentAddress directAddress, Context sourceContext) { + return new BinderChannelBuilder( + checkNotNull(directAddress, "directAddress"), null, sourceContext); + } + + /** + * Creates a channel builder that will bind to a remote Android service, via a string + * target name which will be resolved. + * + *

The underlying Android binding will be torn down when the channel becomes idle. This happens + * after 30 minutes without use by default but can be configured via {@link + * ManagedChannelBuilder#idleTimeout(long, TimeUnit)} or triggered manually with {@link + * ManagedChannel#enterIdle()}. + * + *

You the caller are responsible for managing the lifecycle of any channels built by the + * resulting builder. They will not be shut down automatically. + * + * @param target A target uri which should resolve into an {@link AndroidComponentAddress} + * referencing the service to bind to. + * @param sourceContext the context to bind from (e.g. The current Activity or Application). + * @return a new builder + */ + public static BinderChannelBuilder forTarget(String target, Context sourceContext) { + return new BinderChannelBuilder(null, checkNotNull(target, "target"), sourceContext); } /** @@ -88,7 +110,7 @@ public static BinderChannelBuilder forAddress(String name, int port) { /** * Always fails. Call {@link #forAddress(AndroidComponentAddress, Context)} instead. */ - @DoNotCall("Unsupported. Use forAddress(AndroidComponentAddress, Context) instead") + @DoNotCall("Unsupported. Use forTarget(String, Context) instead") public static BinderChannelBuilder forTarget(String target) { throw new UnsupportedOperationException( "call forAddress(AndroidComponentAddress, Context) instead"); @@ -104,9 +126,11 @@ public static BinderChannelBuilder forTarget(String target) { private BindServiceFlags bindServiceFlags; private BinderChannelBuilder( - AndroidComponentAddress targetAddress, + @Nullable AndroidComponentAddress directAddress, + @Nullable String target, Context sourceContext) { - mainThreadExecutor = ContextCompat.getMainExecutor(sourceContext); + mainThreadExecutor = + ContextCompat.getMainExecutor(checkNotNull(sourceContext, "sourceContext")); securityPolicy = SecurityPolicies.internalOnly(); inboundParcelablePolicy = InboundParcelablePolicy.DEFAULT; bindServiceFlags = BindServiceFlags.DEFAULTS; @@ -126,12 +150,20 @@ public ClientTransportFactory buildClientTransportFactory() { } } - managedChannelImplBuilder = - new ManagedChannelImplBuilder( - targetAddress, - targetAddress.getAuthority(), - new BinderChannelTransportFactoryBuilder(), - null); + if (directAddress != null) { + managedChannelImplBuilder = + new ManagedChannelImplBuilder( + directAddress, + directAddress.getAuthority(), + new BinderChannelTransportFactoryBuilder(), + null); + } else { + managedChannelImplBuilder = + new ManagedChannelImplBuilder( + target, + new BinderChannelTransportFactoryBuilder(), + null); + } } @Override diff --git a/testing/src/main/java/io/grpc/internal/testing/FakeNameResolverProvider.java b/testing/src/main/java/io/grpc/internal/testing/FakeNameResolverProvider.java new file mode 100644 index 000000000000..d056707b7196 --- /dev/null +++ b/testing/src/main/java/io/grpc/internal/testing/FakeNameResolverProvider.java @@ -0,0 +1,94 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal.testing; + +import com.google.common.collect.ImmutableList; +import io.grpc.EquivalentAddressGroup; +import io.grpc.NameResolver; +import io.grpc.NameResolverProvider; +import io.grpc.Status; +import java.net.SocketAddress; +import java.net.URI; + +/** A name resolver to always resolve the given URI into the given address. */ +public final class FakeNameResolverProvider extends NameResolverProvider { + + private final URI targetUri; + private final SocketAddress address; + + public FakeNameResolverProvider(String targetUri, SocketAddress address) { + this.targetUri = URI.create(targetUri); + this.address = address; + } + + @Override + public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { + if (targetUri.equals(this.targetUri)) { + return new FakeNameResolver(address); + } + return null; + } + + @Override + protected boolean isAvailable() { + return true; + } + + @Override + protected int priority() { + return 5; // Default + } + + @Override + public String getDefaultScheme() { + return targetUri.getScheme(); + } + + /** A single name resolver. */ + private static final class FakeNameResolver extends NameResolver { + private static final String AUTHORITY = "fake-authority"; + + private final SocketAddress address; + private volatile boolean shutdown; + + private FakeNameResolver(SocketAddress address) { + this.address = address; + } + + @Override + public void start(Listener2 listener) { + if (shutdown) { + listener.onError(Status.FAILED_PRECONDITION.withDescription("Resolver is shutdown")); + } else { + listener.onResult( + ResolutionResult.newBuilder() + .setAddresses(ImmutableList.of(new EquivalentAddressGroup(address))) + .build()); + } + } + + @Override + public String getServiceAuthority() { + return AUTHORITY; + } + + @Override + public void shutdown() { + shutdown = true; + } + } +} From a46560e4fc9ec4771dc7824055d23f1301dc1e3d Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Mon, 1 Nov 2021 09:44:58 -0700 Subject: [PATCH 4/7] xds: refactor XdsClient in preparation to support federation (#8630) See go/java-xds-client-api-for-federation for detailed description --- .../java/io/grpc/xds/AbstractXdsClient.java | 180 ++++------ .../java/io/grpc/xds/ClientXdsClient.java | 317 +++++++++++++----- .../java/io/grpc/xds/LoadReportClient.java | 6 +- .../grpc/xds/SharedXdsClientPoolProvider.java | 35 +- xds/src/main/java/io/grpc/xds/XdsClient.java | 44 +++ .../io/grpc/xds/ClientXdsClientTestBase.java | 11 +- .../xds/SharedXdsClientPoolProviderTest.java | 6 - 7 files changed, 363 insertions(+), 236 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java index 0e609ff74587..e29949eb8e58 100644 --- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java @@ -28,6 +28,7 @@ import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.grpc.Channel; import io.grpc.Context; import io.grpc.InternalLogId; import io.grpc.ManagedChannel; @@ -36,6 +37,11 @@ import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; import io.grpc.stub.StreamObserver; +import io.grpc.xds.Bootstrapper.ServerInfo; +import io.grpc.xds.ClientXdsClient.XdsChannelFactory; +import io.grpc.xds.EnvoyProtoData.Node; +import io.grpc.xds.XdsClient.ResourceStore; +import io.grpc.xds.XdsClient.XdsResponseHandler; import io.grpc.xds.XdsLogger.XdsLogLevel; import java.util.Collection; import java.util.Collections; @@ -48,7 +54,7 @@ * Common base type for XdsClient implementations, which encapsulates the layer abstraction of * the xDS RPC stream. */ -abstract class AbstractXdsClient extends XdsClient { +final class AbstractXdsClient { private static final String ADS_TYPE_URL_LDS_V2 = "type.googleapis.com/envoy.api.v2.Listener"; private static final String ADS_TYPE_URL_LDS = @@ -66,26 +72,18 @@ abstract class AbstractXdsClient extends XdsClient { private static final String ADS_TYPE_URL_EDS = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; - private final SynchronizationContext syncContext = new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - getLogger().log( - XdsLogLevel.ERROR, - "Uncaught exception in XdsClient SynchronizationContext. Panic!", - e); - // TODO(chengyuanzhang): better error handling. - throw new AssertionError(e); - } - }); + private final SynchronizationContext syncContext; private final InternalLogId logId; private final XdsLogger logger; + private final ServerInfo serverInfo; private final ManagedChannel channel; + private final XdsResponseHandler xdsResponseHandler; + private final ResourceStore resourceStore; private final Context context; private final ScheduledExecutorService timeService; private final BackoffPolicy.Provider backoffPolicyProvider; private final Stopwatch stopwatch; - private final Bootstrapper.BootstrapInfo bootstrapInfo; + private final Node bootstrapNode; // Last successfully applied version_info for each resource type. Starts with empty string. // A version_info is used to update management server with client's most recent knowledge of @@ -103,71 +101,42 @@ public void uncaughtException(Thread t, Throwable e) { @Nullable private ScheduledHandle rpcRetryTimer; - AbstractXdsClient(ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo, - Context context, ScheduledExecutorService timeService, - BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier) { - this.channel = checkNotNull(channel, "channel"); - this.bootstrapInfo = checkNotNull(bootstrapInfo, "bootstrapInfo"); + /** An entity that manages ADS RPCs over a single channel. */ + // TODO: rename to XdsChannel + AbstractXdsClient( + XdsChannelFactory xdsChannelFactory, + ServerInfo serverInfo, + Node bootstrapNode, + XdsResponseHandler xdsResponseHandler, + ResourceStore resourceStore, + Context context, + ScheduledExecutorService + timeService, + SynchronizationContext syncContext, + BackoffPolicy.Provider backoffPolicyProvider, + Supplier stopwatchSupplier) { + this.serverInfo = checkNotNull(serverInfo, "serverInfo"); + this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo); + this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler"); + this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber"); + this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode"); this.context = checkNotNull(context, "context"); this.timeService = checkNotNull(timeService, "timeService"); + this.syncContext = checkNotNull(syncContext, "syncContext"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get(); - logId = InternalLogId.allocate("xds-client", null); + logId = InternalLogId.allocate("xds-client", serverInfo.target()); logger = XdsLogger.withLogId(logId); logger.log(XdsLogLevel.INFO, "Created"); } - /** - * Called when an LDS response is received. - */ - // Must be synchronized. - protected void handleLdsResponse(String versionInfo, List resources, String nonce) { - } - - /** - * Called when a RDS response is received. - */ - // Must be synchronized. - protected void handleRdsResponse(String versionInfo, List resources, String nonce) { - } - - /** - * Called when a CDS response is received. - */ - // Must be synchronized. - protected void handleCdsResponse(String versionInfo, List resources, String nonce) { - } - - /** - * Called when an EDS response is received. - */ - // Must be synchronized. - protected void handleEdsResponse(String versionInfo, List resources, String nonce) { - } - - /** - * Called when the ADS stream is closed passively. - */ - // Must be synchronized. - protected void handleStreamClosed(Status error) { - } - - /** - * Called when the ADS stream has been recreated. - */ - // Must be synchronized. - protected void handleStreamRestarted() { - } - - /** - * Called when being shut down. - */ - // Must be synchronized. - protected void handleShutdown() { + /** The underlying channel. */ + // Currently, only externally used for LrsClient. + Channel channel() { + return channel; } - @Override - final void shutdown() { + void shutdown() { syncContext.execute(new Runnable() { @Override public void run() { @@ -179,49 +148,28 @@ public void run() { if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { rpcRetryTimer.cancel(); } - handleShutdown(); + channel.shutdown(); } }); } - @Override - boolean isShutDown() { - return shutdown; - } - - @Override - Bootstrapper.BootstrapInfo getBootstrapInfo() { - return bootstrapInfo; - } - @Override public String toString() { return logId.toString(); } - /** - * Returns the collection of resources currently subscribing to or {@code null} if not - * subscribing to any resources for the given type. - * - *

Note an empty collection indicates subscribing to resources of the given type with - * wildcard mode. - */ - // Must be synchronized. - @Nullable - abstract Collection getSubscribedResources(ResourceType type); - /** * Updates the resource subscription for the given resource type. */ // Must be synchronized. - protected final void adjustResourceSubscription(ResourceType type) { + void adjustResourceSubscription(ResourceType type) { if (isInBackoff()) { return; } if (adsStream == null) { startRpcStream(); } - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources != null) { adsStream.sendDiscoveryRequest(type, resources); } @@ -232,7 +180,7 @@ protected final void adjustResourceSubscription(ResourceType type) { * and sends an ACK request to the management server. */ // Must be synchronized. - protected final void ackResponse(ResourceType type, String versionInfo, String nonce) { + void ackResponse(ResourceType type, String versionInfo, String nonce) { switch (type) { case LDS: ldsVersion = versionInfo; @@ -252,7 +200,7 @@ protected final void ackResponse(ResourceType type, String versionInfo, String n } logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}", type, nonce, versionInfo); - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources == null) { resources = Collections.emptyList(); } @@ -264,34 +212,22 @@ protected final void ackResponse(ResourceType type, String versionInfo, String n * accepted version) to the management server. */ // Must be synchronized. - protected final void nackResponse(ResourceType type, String nonce, String errorDetail) { + void nackResponse(ResourceType type, String nonce, String errorDetail) { String versionInfo = getCurrentVersion(type); logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}", type, nonce, versionInfo); - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources == null) { resources = Collections.emptyList(); } adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail); } - protected final SynchronizationContext getSyncContext() { - return syncContext; - } - - protected final ScheduledExecutorService getTimeService() { - return timeService; - } - - protected final XdsLogger getLogger() { - return logger; - } - /** * Returns {@code true} if the resource discovery is currently in backoff. */ // Must be synchronized. - protected final boolean isInBackoff() { + boolean isInBackoff() { return rpcRetryTimer != null && rpcRetryTimer.isPending(); } @@ -302,7 +238,7 @@ protected final boolean isInBackoff() { // Must be synchronized. private void startRpcStream() { checkState(adsStream == null, "Previous adsStream has not been cleared yet"); - if (bootstrapInfo.servers().get(0).useProtocolV3()) { + if (serverInfo.useProtocolV3()) { adsStream = new AdsStreamV3(); } else { adsStream = new AdsStreamV2(); @@ -317,8 +253,8 @@ private void startRpcStream() { stopwatch.reset().start(); } + /** Returns the latest accepted version of the given resource type. */ // Must be synchronized. - @Override String getCurrentVersion(ResourceType type) { String version; switch (type) { @@ -353,16 +289,16 @@ public void run() { if (type == ResourceType.UNKNOWN) { continue; } - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources != null) { adsStream.sendDiscoveryRequest(type, resources); } } - handleStreamRestarted(); + xdsResponseHandler.handleStreamRestarted(serverInfo); } } - protected enum ResourceType { + enum ResourceType { UNKNOWN, LDS, RDS, CDS, EDS; String typeUrl() { @@ -488,19 +424,19 @@ final void handleRpcResponse( switch (type) { case LDS: ldsRespNonce = nonce; - handleLdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleLdsResponse(serverInfo, versionInfo, resources, nonce); break; case RDS: rdsRespNonce = nonce; - handleRdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleRdsResponse(serverInfo, versionInfo, resources, nonce); break; case CDS: cdsRespNonce = nonce; - handleCdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleCdsResponse(serverInfo, versionInfo, resources, nonce); break; case EDS: edsRespNonce = nonce; - handleEdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleEdsResponse(serverInfo, versionInfo, resources, nonce); break; case UNKNOWN: default: @@ -526,7 +462,7 @@ private void handleRpcStreamClosed(Status error) { "ADS stream closed with status {0}: {1}. Cause: {2}", error.getCode(), error.getDescription(), error.getCause()); closed = true; - handleStreamClosed(error); + xdsResponseHandler.handleStreamClosed(error); cleanUp(); if (responseReceived || retryBackoffPolicy == null) { // Reset the backoff sequence if had received a response, or backoff sequence @@ -619,7 +555,7 @@ void sendDiscoveryRequest(ResourceType type, String versionInfo, Collection serverChannelMap = new HashMap<>(); private final Map ldsResourceSubscribers = new HashMap<>(); private final Map rdsResourceSubscribers = new HashMap<>(); private final Map cdsResourceSubscribers = new HashMap<>(); private final Map edsResourceSubscribers = new HashMap<>(); private final LoadStatsManager2 loadStatsManager; - private final LoadReportClient lrsClient; + private final Map serverLrsClientMap = new HashMap<>(); + private final XdsChannelFactory xdsChannelFactory; + private final Bootstrapper.BootstrapInfo bootstrapInfo; + private final Context context; + private final ScheduledExecutorService timeService; + private final BackoffPolicy.Provider backoffPolicyProvider; + private final Supplier stopwatchSupplier; private final TimeProvider timeProvider; private boolean reportingLoad; private final TlsContextManager tlsContextManager; + private final InternalLogId logId; + private final XdsLogger logger; + private volatile boolean isShutdown; + // TODO(zdapeng): rename to XdsClientImpl ClientXdsClient( - ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo, Context context, - ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider, - Supplier stopwatchSupplier, TimeProvider timeProvider, + XdsChannelFactory xdsChannelFactory, + Bootstrapper.BootstrapInfo bootstrapInfo, + Context context, + ScheduledExecutorService timeService, + BackoffPolicy.Provider backoffPolicyProvider, + Supplier stopwatchSupplier, + TimeProvider timeProvider, TlsContextManager tlsContextManager) { - super(channel, bootstrapInfo, context, timeService, backoffPolicyProvider, stopwatchSupplier); + this.xdsChannelFactory = xdsChannelFactory; + this.bootstrapInfo = bootstrapInfo; + this.context = context; + this.timeService = timeService; loadStatsManager = new LoadStatsManager2(stopwatchSupplier); + this.backoffPolicyProvider = backoffPolicyProvider; + this.stopwatchSupplier = stopwatchSupplier; this.timeProvider = timeProvider; this.tlsContextManager = checkNotNull(tlsContextManager, "tlsContextManager"); - lrsClient = new LoadReportClient(loadStatsManager, channel, context, - bootstrapInfo.servers().get(0).useProtocolV3(), bootstrapInfo.node(), - getSyncContext(), timeService, backoffPolicyProvider, stopwatchSupplier); + logId = InternalLogId.allocate("xds-client", null); + logger = XdsLogger.withLogId(logId); + logger.log(XdsLogLevel.INFO, "Created"); + } + + private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { + syncContext.throwIfNotInThisSynchronizationContext(); + if (serverChannelMap.containsKey(serverInfo)) { + return; + } + AbstractXdsClient xdsChannel = new AbstractXdsClient( + xdsChannelFactory, + serverInfo, + bootstrapInfo.node(), + this, + this, + context, + timeService, + syncContext, + backoffPolicyProvider, + stopwatchSupplier); + LoadReportClient lrsClient = new LoadReportClient( + loadStatsManager, xdsChannel.channel(), context, serverInfo.useProtocolV3(), + bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier); + serverChannelMap.put(serverInfo, xdsChannel); + serverLrsClientMap.put(serverInfo, lrsClient); } @Override - protected void handleLdsResponse(String versionInfo, List resources, String nonce) { + public void handleLdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -233,12 +299,12 @@ protected void handleLdsResponse(String versionInfo, List resources, String // LdsUpdate parsed successfully. parsedResources.put(listenerName, new ParsedResource(ldsUpdate, resource)); } - getLogger().log(XdsLogLevel.INFO, + logger.log(XdsLogLevel.INFO, "Received LDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo, - nonce, errors); + serverInfo, ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, + versionInfo, nonce, errors); } private LdsUpdate processClientSideListener( @@ -1307,7 +1373,9 @@ static StructOrError parseClusterWeight( } @Override - protected void handleRdsResponse(String versionInfo, List resources, String nonce) { + public void handleRdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -1344,12 +1412,12 @@ protected void handleRdsResponse(String versionInfo, List resources, String parsedResources.put(routeConfigName, new ParsedResource(rdsUpdate, resource)); } - getLogger().log(XdsLogLevel.INFO, + logger.log(XdsLogLevel.INFO, "Received RDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.RDS, parsedResources, invalidResources, Collections.emptySet(), - versionInfo, nonce, errors); + serverInfo, ResourceType.RDS, parsedResources, invalidResources, + Collections.emptySet(), versionInfo, nonce, errors); } private static RdsUpdate processRouteConfiguration( @@ -1370,7 +1438,9 @@ private static RdsUpdate processRouteConfiguration( } @Override - protected void handleCdsResponse(String versionInfo, List resources, String nonce) { + public void handleCdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -1415,12 +1485,12 @@ protected void handleCdsResponse(String versionInfo, List resources, String } parsedResources.put(clusterName, new ParsedResource(cdsUpdate, resource)); } - getLogger().log(XdsLogLevel.INFO, + logger.log(XdsLogLevel.INFO, "Received CDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, versionInfo, - nonce, errors); + serverInfo, ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, + versionInfo, nonce, errors); } @VisibleForTesting @@ -1598,7 +1668,9 @@ private static StructOrError parseNonAggregateCluster( } @Override - protected void handleEdsResponse(String versionInfo, List resources, String nonce) { + public void handleEdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -1641,12 +1713,12 @@ protected void handleEdsResponse(String versionInfo, List resources, String } parsedResources.put(clusterName, new ParsedResource(edsUpdate, resource)); } - getLogger().log( + logger.log( XdsLogLevel.INFO, "Received EDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.EDS, parsedResources, invalidResources, Collections.emptySet(), - versionInfo, nonce, errors); + serverInfo, ResourceType.EDS, parsedResources, invalidResources, + Collections.emptySet(), versionInfo, nonce, errors); } private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment) @@ -1775,7 +1847,8 @@ private static int getRatePerMillion(FractionalPercent percent) { } @Override - protected void handleStreamClosed(Status error) { + public void handleStreamClosed(Status error) { + syncContext.throwIfNotInThisSynchronizationContext(); cleanUpResourceTimers(); for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { subscriber.onError(error); @@ -1792,27 +1865,56 @@ protected void handleStreamClosed(Status error) { } @Override - protected void handleStreamRestarted() { + public void handleStreamRestarted(ServerInfo serverInfo) { + syncContext.throwIfNotInThisSynchronizationContext(); for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } for (ResourceSubscriber subscriber : rdsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } } @Override - protected void handleShutdown() { - if (reportingLoad) { - lrsClient.stopLoadReporting(); - } - cleanUpResourceTimers(); + void shutdown() { + syncContext.execute( + new Runnable() { + @Override + public void run() { + if (isShutdown) { + return; + } + isShutdown = true; + for (AbstractXdsClient xdsChannel : serverChannelMap.values()) { + xdsChannel.shutdown(); + } + if (reportingLoad) { + for (final LoadReportClient lrsClient : serverLrsClientMap.values()) { + lrsClient.stopLoadReporting(); + } + } + cleanUpResourceTimers(); + } + }); + } + + @Override + boolean isShutDown() { + return isShutdown; } private Map getSubscribedResourcesMap(ResourceType type) { @@ -1833,9 +1935,16 @@ private Map getSubscribedResourcesMap(ResourceType t @Nullable @Override - Collection getSubscribedResources(ResourceType type) { + public Collection getSubscribedResources(ServerInfo serverInfo, ResourceType type) { Map resources = getSubscribedResourcesMap(type); - return resources.isEmpty() ? null : resources.keySet(); + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (String key : resources.keySet()) { + if (resources.get(key).serverInfo.equals(serverInfo)) { + builder.add(key); + } + } + Collection retVal = builder.build(); + return retVal.isEmpty() ? null : retVal; } @Override @@ -1854,15 +1963,15 @@ TlsContextManager getTlsContextManager() { @Override void watchLdsResource(final String resourceName, final LdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe LDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe LDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.LDS, resourceName); ldsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.LDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS); } subscriber.addWatcher(watcher); } @@ -1871,16 +1980,16 @@ public void run() { @Override void cancelLdsResourceWatch(final String resourceName, final LdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName); ldsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.LDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS); } } }); @@ -1888,15 +1997,15 @@ public void run() { @Override void watchRdsResource(final String resourceName, final RdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe RDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe RDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.RDS, resourceName); rdsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.RDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS); } subscriber.addWatcher(watcher); } @@ -1905,16 +2014,16 @@ public void run() { @Override void cancelRdsResourceWatch(final String resourceName, final RdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName); rdsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.RDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS); } } }); @@ -1922,15 +2031,15 @@ public void run() { @Override void watchCdsResource(final String resourceName, final CdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.CDS, resourceName); cdsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.CDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS); } subscriber.addWatcher(watcher); } @@ -1939,16 +2048,16 @@ public void run() { @Override void cancelCdsResourceWatch(final String resourceName, final CdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName); cdsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.CDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS); } } }); @@ -1956,15 +2065,15 @@ public void run() { @Override void watchEdsResource(final String resourceName, final EdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe EDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe EDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.EDS, resourceName); edsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.EDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS); } subscriber.addWatcher(watcher); } @@ -1973,30 +2082,32 @@ public void run() { @Override void cancelEdsResourceWatch(final String resourceName, final EdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName); edsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.EDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS); } } }); } @Override - ClusterDropStats addClusterDropStats(String clusterName, @Nullable String edsServiceName) { + ClusterDropStats addClusterDropStats( + String clusterName, @Nullable String edsServiceName) { ClusterDropStats dropCounter = loadStatsManager.getClusterDropStats(clusterName, edsServiceName); - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { if (!reportingLoad) { - lrsClient.startLoadReporting(); + // TODO(https://github.com/grpc/grpc-java/issues/8628): consume ServerInfo arg. + serverLrsClientMap.values().iterator().next().startLoadReporting(); reportingLoad = true; } } @@ -2005,15 +2116,17 @@ public void run() { } @Override - ClusterLocalityStats addClusterLocalityStats(String clusterName, - @Nullable String edsServiceName, Locality locality) { + ClusterLocalityStats addClusterLocalityStats( + String clusterName, @Nullable String edsServiceName, + Locality locality) { ClusterLocalityStats loadCounter = loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality); - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { if (!reportingLoad) { - lrsClient.startLoadReporting(); + // TODO(https://github.com/grpc/grpc-java/issues/8628): consume ServerInfo arg. + serverLrsClientMap.values().iterator().next().startLoadReporting(); reportingLoad = true; } } @@ -2021,6 +2134,25 @@ public void run() { return loadCounter; } + @Override + Bootstrapper.BootstrapInfo getBootstrapInfo() { + return bootstrapInfo; + } + + // TODO(https://github.com/grpc/grpc-java/issues/8629): remove this + @Override + String getCurrentVersion(ResourceType type) { + if (serverChannelMap.isEmpty()) { + return ""; + } + return serverChannelMap.values().iterator().next().getCurrentVersion(type); + } + + @Override + public String toString() { + return logId.toString(); + } + private void cleanUpResourceTimers() { for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { subscriber.stopTimer(); @@ -2037,18 +2169,19 @@ private void cleanUpResourceTimers() { } private void handleResourceUpdate( - ResourceType type, Map parsedResources, Set invalidResources, - Set retainedResources, String version, String nonce, List errors) { + ServerInfo serverInfo, ResourceType type, Map parsedResources, + Set invalidResources, Set retainedResources, String version, String nonce, + List errors) { String errorDetail = null; if (errors.isEmpty()) { checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors"); - ackResponse(type, version, nonce); + serverChannelMap.get(serverInfo).ackResponse(type, version, nonce); } else { errorDetail = Joiner.on('\n').join(errors); - getLogger().log(XdsLogLevel.WARNING, + logger.log(XdsLogLevel.WARNING, "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}", type, version, nonce, errorDetail); - nackResponse(type, nonce, errorDetail); + serverChannelMap.get(serverInfo).nackResponse(type, nonce, errorDetail); } long updateTime = timeProvider.currentTimeNanos(); for (Map.Entry entry : getSubscribedResourcesMap(type).entrySet()) { @@ -2123,6 +2256,8 @@ private Any getRawResource() { * Tracks a single subscribed resource. */ private final class ResourceSubscriber { + private final ServerInfo serverInfo; + private final AbstractXdsClient xdsChannel; private final ResourceType type; private final String resource; private final Set watchers = new HashSet<>(); @@ -2132,17 +2267,26 @@ private final class ResourceSubscriber { private ResourceMetadata metadata; ResourceSubscriber(ResourceType type, String resource) { + syncContext.throwIfNotInThisSynchronizationContext(); this.type = type; this.resource = resource; + this.serverInfo = getServerInfo(); // Initialize metadata in UNKNOWN state to cover the case when resource subscriber, // is created but not yet requested because the client is in backoff. this.metadata = ResourceMetadata.newResourceMetadataUnknown(); - if (isInBackoff()) { + maybeCreateXdsChannelWithLrs(serverInfo); + this.xdsChannel = serverChannelMap.get(serverInfo); + if (xdsChannel.isInBackoff()) { return; } restartTimer(); } + // TODO(zdapeng): add resourceName arg and support xdstp:// resources + private ServerInfo getServerInfo() { + return bootstrapInfo.servers().get(0); // use first server + } + void addWatcher(ResourceWatcher watcher) { checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher); watchers.add(watcher); @@ -2165,7 +2309,7 @@ void restartTimer() { class ResourceNotFound implements Runnable { @Override public void run() { - getLogger().log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout", + logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout", type, resource); respTimer = null; onAbsent(); @@ -2179,9 +2323,9 @@ public String toString() { // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED. metadata = ResourceMetadata.newResourceMetadataRequested(); - respTimer = getSyncContext().schedule( + respTimer = syncContext.schedule( new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, - getTimeService()); + timeService); } void stopTimer() { @@ -2216,7 +2360,7 @@ void onAbsent() { if (respTimer != null && respTimer.isPending()) { // too early to conclude absence return; } - getLogger().log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); + logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); if (!absent) { data = null; absent = true; @@ -2324,4 +2468,19 @@ String getErrorDetail() { return errorDetail; } } + + abstract static class XdsChannelFactory { + static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() { + @Override + ManagedChannel create(ServerInfo serverInfo) { + String target = serverInfo.target(); + ChannelCredentials channelCredentials = serverInfo.channelCredentials(); + return Grpc.newChannelBuilder(target, channelCredentials) + .keepAliveTime(5, TimeUnit.MINUTES) + .build(); + } + }; + + abstract ManagedChannel create(ServerInfo serverInfo); + } } diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index 54fa20128bc0..af2a673e9f7b 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -28,9 +28,9 @@ import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceStub; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; +import io.grpc.Channel; import io.grpc.Context; import io.grpc.InternalLogId; -import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; @@ -55,7 +55,7 @@ final class LoadReportClient { private final InternalLogId logId; private final XdsLogger logger; - private final ManagedChannel channel; + private final Channel channel; private final Context context; private final boolean useProtocolV3; private final Node node; @@ -75,7 +75,7 @@ final class LoadReportClient { LoadReportClient( LoadStatsManager2 loadStatsManager, - ManagedChannel channel, + Channel channel, Context context, boolean useProtocolV3, Node node, diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java index 14bdced5dac5..1c8fe0bad6d2 100644 --- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java @@ -19,22 +19,18 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; -import io.grpc.ChannelCredentials; import io.grpc.Context; -import io.grpc.Grpc; -import io.grpc.ManagedChannel; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.TimeProvider; import io.grpc.xds.Bootstrapper.BootstrapInfo; -import io.grpc.xds.Bootstrapper.ServerInfo; +import io.grpc.xds.ClientXdsClient.XdsChannelFactory; import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; import io.grpc.xds.internal.sds.TlsContextManagerImpl; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -113,8 +109,6 @@ static class RefCountedXdsClientObjectPool implements ObjectPool { @GuardedBy("lock") private ScheduledExecutorService scheduler; @GuardedBy("lock") - private ManagedChannel channel; - @GuardedBy("lock") private XdsClient xdsClient; @GuardedBy("lock") private int refCount; @@ -128,16 +122,16 @@ static class RefCountedXdsClientObjectPool implements ObjectPool { public XdsClient getObject() { synchronized (lock) { if (refCount == 0) { - ServerInfo serverInfo = bootstrapInfo.servers().get(0); // use first server - String target = serverInfo.target(); - ChannelCredentials channelCredentials = serverInfo.channelCredentials(); - channel = Grpc.newChannelBuilder(target, channelCredentials) - .keepAliveTime(5, TimeUnit.MINUTES) - .build(); scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); - xdsClient = new ClientXdsClient(channel, bootstrapInfo, context, scheduler, - new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER, - TimeProvider.SYSTEM_TIME_PROVIDER, new TlsContextManagerImpl(bootstrapInfo)); + xdsClient = new ClientXdsClient( + XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY, + bootstrapInfo, + context, + scheduler, + new ExponentialBackoffPolicy.Provider(), + GrpcUtil.STOPWATCH_SUPPLIER, + TimeProvider.SYSTEM_TIME_PROVIDER, + new TlsContextManagerImpl(bootstrapInfo)); } refCount++; return xdsClient; @@ -151,21 +145,12 @@ public XdsClient returnObject(Object object) { if (refCount == 0) { xdsClient.shutdown(); xdsClient = null; - channel.shutdown(); scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler); } return null; } } - @VisibleForTesting - @Nullable - ManagedChannel getChannelForTest() { - synchronized (lock) { - return channel; - } - } - @VisibleForTesting @Nullable XdsClient getXdsClientForTest() { diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 6b6be57f0429..1daa257e54ea 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -24,6 +24,7 @@ import com.google.protobuf.Any; import io.grpc.Status; import io.grpc.xds.AbstractXdsClient.ResourceType; +import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.EnvoyServerProtoData.Listener; @@ -31,6 +32,7 @@ import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -495,6 +497,7 @@ TlsContextManager getTlsContextManager() { /** * Returns the latest accepted version of the given resource type. */ + // TODO(https://github.com/grpc/grpc-java/issues/8629): remove this String getCurrentVersion(ResourceType type) { throw new UnsupportedOperationException(); } @@ -566,6 +569,7 @@ void cancelEdsResourceWatch(String resourceName, EdsResourceWatcher watcher) { * use {@link ClusterDropStats#release} to release its hard reference when it is safe to * stop reporting dropped RPCs for the specified cluster in the future. */ + // TODO(https://github.com/grpc/grpc-java/issues/8628): add ServerInfo arg ClusterDropStats addClusterDropStats(String clusterName, @Nullable String edsServiceName) { throw new UnsupportedOperationException(); } @@ -578,8 +582,48 @@ ClusterDropStats addClusterDropStats(String clusterName, @Nullable String edsSer * reference when it is safe to stop reporting RPC loads for the specified locality in the * future. */ + // TODO(https://github.com/grpc/grpc-java/issues/8628): add ServerInfo arg ClusterLocalityStats addClusterLocalityStats( String clusterName, @Nullable String edsServiceName, Locality locality) { throw new UnsupportedOperationException(); } + + interface XdsResponseHandler { + /** Called when an LDS response is received. */ + void handleLdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when an RDS response is received. */ + void handleRdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when an CDS response is received. */ + void handleCdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when an EDS response is received. */ + void handleEdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when the ADS stream is closed passively. */ + // Must be synchronized. + void handleStreamClosed(Status error); + + /** Called when the ADS stream has been recreated. */ + // Must be synchronized. + void handleStreamRestarted(ServerInfo serverInfo); + } + + interface ResourceStore { + /** + * Returns the collection of resources currently subscribing to or {@code null} if not + * subscribing to any resources for the given type. + * + *

Note an empty collection indicates subscribing to resources of the given type with + * wildcard mode. + */ + // Must be synchronized. + @Nullable + Collection getSubscribedResources(ServerInfo serverInfo, ResourceType type); + } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 5fa9e3da734a..9809738c68d6 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -59,6 +59,8 @@ import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.AbstractXdsClient.ResourceType; import io.grpc.xds.Bootstrapper.CertificateProviderInfo; +import io.grpc.xds.Bootstrapper.ServerInfo; +import io.grpc.xds.ClientXdsClient.XdsChannelFactory; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; @@ -272,6 +274,12 @@ public void setUp() throws IOException { .start()); channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); + XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() { + @Override + ManagedChannel create(ServerInfo serverInfo) { + return channel; + } + }; Bootstrapper.BootstrapInfo bootstrapInfo = Bootstrapper.BootstrapInfo.builder() @@ -284,7 +292,7 @@ public void setUp() throws IOException { .build(); xdsClient = new ClientXdsClient( - channel, + xdsChannelFactory, bootstrapInfo, Context.ROOT, fakeClock.getScheduledExecutorService(), @@ -2325,6 +2333,7 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe @Test public void reportLoadStatsToServer() { + xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); String clusterName = "cluster-foo.googleapis.com"; ClusterDropStats dropStats = xdsClient.addClusterDropStats(clusterName, null); LrsRpcCall lrsCall = loadReportCalls.poll(); diff --git a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java index 6a3cba4ac351..14a8f1ce7438 100644 --- a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.when; import io.grpc.InsecureChannelCredentials; -import io.grpc.ManagedChannel; import io.grpc.internal.ObjectPool; import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.Bootstrapper.ServerInfo; @@ -90,7 +89,6 @@ public void refCountedXdsClientObjectPool_delayedCreation() { BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); RefCountedXdsClientObjectPool xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo); assertThat(xdsClientPool.getXdsClientForTest()).isNull(); - assertThat(xdsClientPool.getChannelForTest()).isNull(); XdsClient xdsClient = xdsClientPool.getObject(); assertThat(xdsClientPool.getXdsClientForTest()).isNotNull(); xdsClientPool.returnObject(xdsClient); @@ -113,7 +111,6 @@ public void refCountedXdsClientObjectPool_refCounted() { // returnObject twice assertThat(xdsClientPool.returnObject(xdsClient)).isNull(); assertThat(xdsClient.isShutDown()).isTrue(); - assertThat(xdsClientPool.getChannelForTest().isShutdown()).isTrue(); } @Test @@ -123,14 +120,11 @@ public void refCountedXdsClientObjectPool_getObjectCreatesNewInstanceIfAlreadySh BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); RefCountedXdsClientObjectPool xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo); XdsClient xdsClient1 = xdsClientPool.getObject(); - ManagedChannel channel1 = xdsClientPool.getChannelForTest(); assertThat(xdsClientPool.returnObject(xdsClient1)).isNull(); assertThat(xdsClient1.isShutDown()).isTrue(); - assertThat(channel1.isShutdown()).isTrue(); XdsClient xdsClient2 = xdsClientPool.getObject(); assertThat(xdsClient2).isNotSameInstanceAs(xdsClient1); - assertThat(xdsClientPool.getChannelForTest()).isNotSameInstanceAs(channel1); xdsClientPool.returnObject(xdsClient2); } } From 746501dff6ecf398cfbd0224a882536b9e808b43 Mon Sep 17 00:00:00 2001 From: markb74 <57717302+markb74@users.noreply.github.com> Date: Mon, 1 Nov 2021 18:57:30 +0100 Subject: [PATCH 5/7] binder: SecurityPolicy updates (take 2). (#8637) The previous attempt at this CL relied on guava's Hashing class which is still in beta. This update compares Signature objects directly instead of SHA256 hashs, removing the need for the Hashing class. Add additional comments to the security policy class, to mention that implementing new policies requires significant care. With that in mind, add security policies to check the peer app's signature, so people can create cross-app communication without having to implement their own policy. Finally, add the UntrustedSecurityPolicies class, since that's inevitably a policy which is sometimes needed. --- .../java/io/grpc/binder/SecurityPolicies.java | 132 ++++++++++++++++++ .../java/io/grpc/binder/SecurityPolicy.java | 5 + .../binder/UntrustedSecurityPolicies.java | 47 +++++++ .../io/grpc/binder/SecurityPoliciesTest.java | 118 ++++++++++++++++ 4 files changed, 302 insertions(+) create mode 100644 binder/src/main/java/io/grpc/binder/UntrustedSecurityPolicies.java diff --git a/binder/src/main/java/io/grpc/binder/SecurityPolicies.java b/binder/src/main/java/io/grpc/binder/SecurityPolicies.java index be46b9e3e54c..dcf36be00ca6 100644 --- a/binder/src/main/java/io/grpc/binder/SecurityPolicies.java +++ b/binder/src/main/java/io/grpc/binder/SecurityPolicies.java @@ -16,9 +16,20 @@ package io.grpc.binder; +import android.annotation.SuppressLint; +import android.content.pm.PackageInfo; +import android.content.pm.PackageManager; +import android.content.pm.PackageManager.NameNotFoundException; +import android.content.pm.Signature; +import android.os.Build; import android.os.Process; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import io.grpc.ExperimentalApi; import io.grpc.Status; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; import javax.annotation.CheckReturnValue; /** Static factory methods for creating standard security policies. */ @@ -55,4 +66,125 @@ public Status checkAuthorization(int uid) { } }; } + + /** + * Creates a {@link SecurityPolicy} which checks if the package signature + * matches {@code requiredSignature}. + * + * @param packageName the package name of the allowed package. + * @param requiredSignature the allowed signature of the allowed package. + * @throws NullPointerException if any of the inputs are {@code null}. + */ + public static SecurityPolicy hasSignature( + PackageManager packageManager, String packageName, Signature requiredSignature) { + return oneOfSignatures( + packageManager, packageName, ImmutableList.of(requiredSignature)); + } + + /** + * Creates a {@link SecurityPolicy} which checks if the package signature + * matches any of {@code requiredSignatures}. + * + * @param packageName the package name of the allowed package. + * @param requiredSignatures the allowed signatures of the allowed package. + * @throws NullPointerException if any of the inputs are {@code null}. + * @throws IllegalArgumentException if {@code requiredSignatures} is empty. + */ + public static SecurityPolicy oneOfSignatures( + PackageManager packageManager, + String packageName, + Collection requiredSignatures) { + Preconditions.checkNotNull(packageManager, "packageManager"); + Preconditions.checkNotNull(packageName, "packageName"); + Preconditions.checkNotNull(requiredSignatures, "requiredSignatures"); + Preconditions.checkArgument(!requiredSignatures.isEmpty(), + "requiredSignatures"); + ImmutableList requiredSignaturesImmutable = ImmutableList.copyOf(requiredSignatures); + + for (Signature requiredSignature : requiredSignaturesImmutable) { + Preconditions.checkNotNull(requiredSignature); + } + + return new SecurityPolicy() { + @Override + public Status checkAuthorization(int uid) { + return checkUidSignature( + packageManager, uid, packageName, requiredSignaturesImmutable); + } + }; + } + + private static Status checkUidSignature( + PackageManager packageManager, + int uid, + String packageName, + ImmutableList requiredSignatures) { + String[] packages = packageManager.getPackagesForUid(uid); + if (packages == null) { + return Status.UNAUTHENTICATED.withDescription( + "Rejected by signature check security policy"); + } + boolean packageNameMatched = false; + for (String pkg : packages) { + if (!packageName.equals(pkg)) { + continue; + } + packageNameMatched = true; + if (checkPackageSignature(packageManager, pkg, requiredSignatures)) { + return Status.OK; + } + } + return Status.PERMISSION_DENIED.withDescription( + "Rejected by signature check security policy. Package name matched: " + + packageNameMatched); + } + + /** + * Checks if the signature of {@code packageName} matches one of the given signatures. + * + * @param packageName the package to be checked + * @param requiredSignatures list of signatures. + * @return {@code true} if {@code packageName} has a matching signature. + */ + @SuppressWarnings("deprecation") // For PackageInfo.signatures + @SuppressLint("PackageManagerGetSignatures") // We only allow 1 signature. + private static boolean checkPackageSignature( + PackageManager packageManager, + String packageName, + ImmutableList requiredSignatures) { + PackageInfo packageInfo; + try { + if (Build.VERSION.SDK_INT >= 28) { + packageInfo = + packageManager.getPackageInfo(packageName, PackageManager.GET_SIGNING_CERTIFICATES); + if (packageInfo.signingInfo == null) { + return false; + } + Signature[] signatures = + packageInfo.signingInfo.hasMultipleSigners() + ? packageInfo.signingInfo.getApkContentsSigners() + : packageInfo.signingInfo.getSigningCertificateHistory(); + + for (Signature signature : signatures) { + if (requiredSignatures.contains(signature)) { + return true; + } + } + } else { + packageInfo = packageManager.getPackageInfo(packageName, PackageManager.GET_SIGNATURES); + if (packageInfo.signatures == null || packageInfo.signatures.length != 1) { + // Reject multiply-signed apks because of b/13678484 + // (See PackageManagerGetSignatures supression above). + return false; + } + + if (requiredSignatures.contains(packageInfo.signatures[0])) { + return true; + } + } + } catch (NameNotFoundException nnfe) { + return false; + } + return false; + } } diff --git a/binder/src/main/java/io/grpc/binder/SecurityPolicy.java b/binder/src/main/java/io/grpc/binder/SecurityPolicy.java index d7dad53fdc83..d13f3a863fd2 100644 --- a/binder/src/main/java/io/grpc/binder/SecurityPolicy.java +++ b/binder/src/main/java/io/grpc/binder/SecurityPolicy.java @@ -23,6 +23,11 @@ /** * Decides whether a given Android UID is authorized to access some resource. * + * While it's possible to extend this class to define your own policy, it's strongly + * recommended that you only use the policies provided by the {@link SecurityPolicies} or + * {@link UntrustedSecurityPolicies} classes. Implementing your own security policy requires + * significant care, and an understanding of the details and pitfalls of Android security. + * *

IMPORTANT For any concrete extensions of this class, it's assumed that the * authorization status of a given UID will not change as long as a process with that UID is * alive. diff --git a/binder/src/main/java/io/grpc/binder/UntrustedSecurityPolicies.java b/binder/src/main/java/io/grpc/binder/UntrustedSecurityPolicies.java new file mode 100644 index 000000000000..7c842b025acb --- /dev/null +++ b/binder/src/main/java/io/grpc/binder/UntrustedSecurityPolicies.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.binder; + +import io.grpc.ExperimentalApi; +import io.grpc.Status; +import javax.annotation.CheckReturnValue; + +/** + * Static factory methods for creating untrusted security policies. + */ +@CheckReturnValue +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8022") +public final class UntrustedSecurityPolicies { + + private UntrustedSecurityPolicies() {} + + /** + * Return a security policy which allows any peer on device. + * Servers should only use this policy if they intend to expose + * a service to all applications on device. + * Clients should only use this policy if they don't need to trust the + * application they're connecting to. + */ + public static SecurityPolicy untrustedPublic() { + return new SecurityPolicy() { + @Override + public Status checkAuthorization(int uid) { + return Status.OK; + } + }; + } +} diff --git a/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java b/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java index 6fd9e22ebaac..86edb5ad7df2 100644 --- a/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java +++ b/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java @@ -17,22 +17,64 @@ package io.grpc.binder; import static com.google.common.truth.Truth.assertThat; +import static org.robolectric.Shadows.shadowOf; +import android.content.Context; +import android.content.pm.PackageInfo; +import android.content.pm.PackageManager; +import android.content.pm.Signature; import android.os.Process; +import androidx.test.core.app.ApplicationProvider; +import com.google.common.collect.ImmutableList; import io.grpc.Status; +import io.grpc.binder.SecurityPolicy; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.robolectric.RobolectricTestRunner; @RunWith(RobolectricTestRunner.class) public final class SecurityPoliciesTest { + private static final int MY_UID = Process.myUid(); private static final int OTHER_UID = MY_UID + 1; + private static final int OTHER_UID_SAME_SIGNATURE = MY_UID + 2; + private static final int OTHER_UID_NO_SIGNATURE = MY_UID + 3; + private static final int OTHER_UID_UNKNOWN = MY_UID + 4; private static final String PERMISSION_DENIED_REASONS = "some reasons"; + private static final Signature SIG1 = new Signature("1234"); + private static final Signature SIG2 = new Signature("4321"); + + private static final String OTHER_UID_PACKAGE_NAME = "other.package"; + private static final String OTHER_UID_SAME_SIGNATURE_PACKAGE_NAME = "other.package.samesignature"; + private static final String OTHER_UID_NO_SIGNATURE_PACKAGE_NAME = "other.package.nosignature"; + + private Context appContext; + private PackageManager packageManager; + private SecurityPolicy policy; + @Before + public void setUp() { + appContext = ApplicationProvider.getApplicationContext(); + packageManager = appContext.getPackageManager(); + installPackage(MY_UID, appContext.getPackageName(), SIG1); + installPackage(OTHER_UID, OTHER_UID_PACKAGE_NAME, SIG2); + installPackage(OTHER_UID_SAME_SIGNATURE, OTHER_UID_SAME_SIGNATURE_PACKAGE_NAME, SIG1); + installPackage(OTHER_UID_NO_SIGNATURE, OTHER_UID_NO_SIGNATURE_PACKAGE_NAME); + } + + @SuppressWarnings("deprecation") + private void installPackage(int uid, String packageName, Signature... signatures) { + PackageInfo info = new PackageInfo(); + info.packageName = packageName; + info.signatures = signatures; + shadowOf(packageManager).installPackage(info); + shadowOf(packageManager).setPackagesForUid(uid, packageName); + } + @Test public void testInternalOnly() throws Exception { policy = SecurityPolicies.internalOnly(); @@ -53,4 +95,80 @@ public void testPermissionDenied() throws Exception { assertThat(policy.checkAuthorization(OTHER_UID).getDescription()) .isEqualTo(PERMISSION_DENIED_REASONS); } + + @Test + public void testHasSignature_succeedsIfPackageNameAndSignaturesMatch() + throws Exception { + policy = SecurityPolicies.hasSignature(packageManager, OTHER_UID_PACKAGE_NAME, SIG2); + + // THEN UID for package that has SIG2 will be authorized + assertThat(policy.checkAuthorization(OTHER_UID).getCode()).isEqualTo(Status.OK.getCode()); + } + + @Test + public void testHasSignature_failsIfPackageNameDoesNotMatch() throws Exception { + policy = SecurityPolicies.hasSignature(packageManager, appContext.getPackageName(), SIG1); + + // THEN UID for package that has SIG1 but different package name will not be authorized + assertThat(policy.checkAuthorization(OTHER_UID_SAME_SIGNATURE).getCode()) + .isEqualTo(Status.PERMISSION_DENIED.getCode()); + } + + @Test + public void testHasSignature_failsIfSignatureDoesNotMatch() throws Exception { + policy = SecurityPolicies.hasSignature(packageManager, OTHER_UID_PACKAGE_NAME, SIG1); + + // THEN UID for package that doesn't have SIG1 will not be authorized + assertThat(policy.checkAuthorization(OTHER_UID).getCode()) + .isEqualTo(Status.PERMISSION_DENIED.getCode()); + } + + @Test + public void testOneOfSignatures_succeedsIfPackageNameAndSignaturesMatch() + throws Exception { + policy = + SecurityPolicies.oneOfSignatures( + packageManager, OTHER_UID_PACKAGE_NAME, ImmutableList.of(SIG2)); + + // THEN UID for package that has SIG2 will be authorized + assertThat(policy.checkAuthorization(OTHER_UID).getCode()).isEqualTo(Status.OK.getCode()); + } + + @Test + public void testOneOfSignature_failsIfAllSignaturesDoNotMatch() throws Exception { + policy = + SecurityPolicies.oneOfSignatures( + packageManager, + appContext.getPackageName(), + ImmutableList.of(SIG1, new Signature("1314"))); + + // THEN UID for package that has SIG1 but different package name will not be authorized + assertThat(policy.checkAuthorization(OTHER_UID_SAME_SIGNATURE).getCode()) + .isEqualTo(Status.PERMISSION_DENIED.getCode()); + } + + @Test + public void testOneOfSignature_succeedsIfPackageNameAndOneOfSignaturesMatch() + throws Exception { + policy = + SecurityPolicies.oneOfSignatures( + packageManager, + OTHER_UID_PACKAGE_NAME, + ImmutableList.of(SIG1, SIG2)); + + // THEN UID for package that has SIG2 will be authorized + assertThat(policy.checkAuthorization(OTHER_UID).getCode()).isEqualTo(Status.OK.getCode()); + } + + @Test + public void testHasSignature_failsIfUidUnknown() throws Exception { + policy = + SecurityPolicies.hasSignature( + packageManager, + appContext.getPackageName(), + SIG1); + + assertThat(policy.checkAuthorization(OTHER_UID_UNKNOWN).getCode()) + .isEqualTo(Status.UNAUTHENTICATED.getCode()); + } } From c1e19af86dea0b9a8725969a95e116029397ad4d Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Tue, 2 Nov 2021 12:47:47 -0700 Subject: [PATCH 6/7] grpclb: fallback timer only when not already using fallback backends. (#8646) Addresses a problem where we initially only resolve addresses to the backends, but not the load balancer and then later resolve addresses to both. In this situation the fallback timer was started during the second instance even if it resulted in the timer later failing as we were already using fallback backends. This change assures that a fallback time is only ever started if we are not already using the fallback backends. This is a follow-up fix to #8253. --- .../main/java/io/grpc/grpclb/GrpclbState.java | 5 ++-- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 27 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 8607d3996a5d..1eebaa63a8e8 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -287,8 +287,9 @@ void handleAddresses( cancelLbRpcRetryTimer(); startLbRpc(); } - // Start the fallback timer if it's never started - if (fallbackTimer == null) { + // Start the fallback timer if it's never started and we are not already using fallback + // backends. + if (fallbackTimer == null && !usingFallbackBackends) { fallbackTimer = syncContext.schedule( new FallbackModeTask(BALANCER_TIMEOUT_STATUS), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS, timerService); diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index cb231c6c055f..293c0aa0b82a 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1462,6 +1462,33 @@ public void grpclbFallback_noBalancerAddress() { .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); } + /** + * A test for a situation where we first only get backend addresses resolved and then in a + * later name resolution get both backend and load balancer addresses. The first instance + * will switch us to using fallback backends and it is important that in the second instance + * we do not start a fallback timer as it will fail when it triggers if the fallback backends + * are already in use. + */ + @Test + public void grpclbFallback_noTimerWhenAlreadyInFallback() { + // Initially we only get backend addresses without any LB ones. This should get us to use + // fallback backends from the start as we won't be able to even talk to the load balancer. + // No fallback timer would be started as we already started to use fallback backends. + deliverResolvedAddresses(createResolvedBalancerAddresses(1), + Collections.emptyList()); + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + + // Later a new name resolution call happens and we get both backend and LB addresses. Since we + // are already operating with fallback backends a fallback timer should not be started to move + // us to fallback mode. + deliverResolvedAddresses(Collections.emptyList(), + createResolvedBalancerAddresses(1)); + + // If a fallback timer is started it will eventually throw an exception when it tries to switch + // us to using fallback backends when we already are using them. + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + } + @Test public void grpclbFallback_balancerLost() { subtestGrpclbFallbackConnectionLost(true, false); From 0000cba665c69958355b639474c7387d98afcc79 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 2 Nov 2021 13:20:41 -0700 Subject: [PATCH 7/7] xds: add xds end to end interop test (#8618) Add AbstractXdsInteropTest, XdsTestControlPlaneService and only ping-pong testcase in initial implementation. AbstractXdsInteropTest sets up the test control plane, create xdsClient and xdServer using bootstrap override, test case extending AbstractXdsInteropTest is supposed to override the control plane config and run the verification. XdsTestControlPlaneService only has static xds configurations, not able to keep states. How to run: ./gradlew :grpc-interop-testing:installDist -PskipCodegen=true ./interop-testing/build/install/grpc-interop-testing/bin/xds-e2e-test-client --- interop-testing/build.gradle | 8 + .../integration/AbstractXdsInteropTest.java | 337 ++++++++++++++++++ .../testing/integration/XdsInteropTest.java | 57 +++ .../XdsTestControlPlaneService.java | 269 ++++++++++++++ 4 files changed, 671 insertions(+) create mode 100644 interop-testing/src/main/java/io/grpc/testing/integration/AbstractXdsInteropTest.java create mode 100644 interop-testing/src/main/java/io/grpc/testing/integration/XdsInteropTest.java create mode 100644 interop-testing/src/main/java/io/grpc/testing/integration/XdsTestControlPlaneService.java diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index 944c0daab811..ef7510c17238 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -147,6 +147,13 @@ task xds_test_server(type: CreateStartScripts) { classpath = startScripts.classpath } +task xds_e2e_client(type: CreateStartScripts) { + mainClassName = "io.grpc.testing.integration.XdsInteropTest" + applicationName = "xds-e2e-test-client" + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + applicationDistribution.into("bin") { from(test_client) from(test_server) @@ -157,6 +164,7 @@ applicationDistribution.into("bin") { from(grpclb_fallback_test_client) from(xds_test_client) from(xds_test_server) + from(xds_e2e_client) fileMode = 0755 } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractXdsInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractXdsInteropTest.java new file mode 100644 index 000000000000..0bd2318b0fc5 --- /dev/null +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractXdsInteropTest.java @@ -0,0 +1,337 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.UInt32Value; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.InsecureServerCredentials; +import io.grpc.ManagedChannel; +import io.grpc.NameResolverRegistry; +import io.grpc.Server; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.xds.XdsNameResolverProvider; +import io.grpc.xds.XdsServerBuilder; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.Address; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.TrafficDirection; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.ApiListener; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.Filter; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.FilterChain; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.FilterChainMatch; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.Listener; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.NonForwardingAction; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.Route; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.RouteAction; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.RouteMatch; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.VirtualHost; +import io.grpc.xds.shaded.io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; +import io.grpc.xds.shaded.io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; +import io.grpc.xds.shaded.io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; +import io.grpc.xds.shaded.io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Abstract base class for end-to-end xds tests. + * A local control plane is implemented in {@link XdsTestControlPlaneService}. + * Test cases can inject xds configs to the control plane for testing. + */ +public abstract class AbstractXdsInteropTest { + private static final Logger logger = Logger.getLogger(AbstractXdsInteropTest.class.getName()); + + protected static final int testServerPort = 8080; + private static final int controlPlaneServicePort = 443; + private Server server; + private Server controlPlane; + protected TestServiceGrpc.TestServiceBlockingStub blockingStub; + private ScheduledExecutorService executor; + private XdsNameResolverProvider nameResolverProvider; + private static final String scheme = "test-xds"; + private static final String serverHostName = "0.0.0.0:" + testServerPort; + private static final String SERVER_LISTENER_TEMPLATE = + "grpc/server?udpa.resource.listening_address=%s"; + private static final String rdsName = "route-config.googleapis.com"; + private static final String clusterName = "cluster0"; + private static final String edsName = "eds-service-0"; + private static final String HTTP_CONNECTION_MANAGER_TYPE_URL = + "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3" + + ".HttpConnectionManager"; + + private static final Map defaultClientBootstrapOverride = ImmutableMap.of( + "node", ImmutableMap.of( + "id", UUID.randomUUID().toString(), + "cluster", "cluster0"), + "xds_servers", Collections.singletonList( + ImmutableMap.of( + "server_uri", "localhost:" + controlPlaneServicePort, + "channel_creds", Collections.singletonList( + ImmutableMap.of("type", "insecure") + ), + "server_features", Collections.singletonList("xds_v3") + ) + ) + ); + + /** + * Provides default client bootstrap. + * A subclass test case should override this method if it tests client bootstrap. + */ + protected Map getClientBootstrapOverride() { + return defaultClientBootstrapOverride; + } + + private static final Map defaultServerBootstrapOverride = ImmutableMap.of( + "node", ImmutableMap.of( + "id", UUID.randomUUID().toString()), + "xds_servers", Collections.singletonList( + ImmutableMap.of( + "server_uri", "localhost:" + controlPlaneServicePort, + "channel_creds", Collections.singletonList( + ImmutableMap.of("type", "insecure") + ), + "server_features", Collections.singletonList("xds_v3") + ) + ), + "server_listener_resource_name_template", SERVER_LISTENER_TEMPLATE + ); + + /** + * Provides default server bootstrap. + * A subclass test case should override this method if it tests server bootstrap. + */ + protected Map getServerBootstrapOverride() { + return defaultServerBootstrapOverride; + } + + protected void setUp() throws Exception { + startControlPlane(); + startServer(); + nameResolverProvider = XdsNameResolverProvider.createForTest(scheme, + getClientBootstrapOverride()); + NameResolverRegistry.getDefaultRegistry().register(nameResolverProvider); + ManagedChannel channel = Grpc.newChannelBuilder(scheme + ":///" + serverHostName, + InsecureChannelCredentials.create()).build(); + blockingStub = TestServiceGrpc.newBlockingStub(channel); + } + + protected void tearDown() throws Exception { + if (server != null) { + server.shutdownNow(); + if (!server.awaitTermination(5, TimeUnit.SECONDS)) { + logger.log(Level.SEVERE, "Timed out waiting for server shutdown"); + } + } + if (controlPlane != null) { + controlPlane.shutdownNow(); + if (!controlPlane.awaitTermination(5, TimeUnit.SECONDS)) { + logger.log(Level.SEVERE, "Timed out waiting for server shutdown"); + } + } + if (executor != null) { + MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS); + } + NameResolverRegistry.getDefaultRegistry().deregister(nameResolverProvider); + } + + protected void startServer() throws Exception { + executor = Executors.newSingleThreadScheduledExecutor(); + XdsServerBuilder serverBuilder = XdsServerBuilder.forPort( + testServerPort, InsecureServerCredentials.create()) + .addService(new TestServiceImpl(executor)) + .overrideBootstrapForTest(getServerBootstrapOverride()); + server = serverBuilder.build().start(); + } + + /** + * Provides default control plane xds configs. + * A subclass test case should override this method to inject control plane xds configs to verify + * end-to-end behavior. + */ + protected XdsTestControlPlaneService.XdsTestControlPlaneConfig getControlPlaneConfig() { + String tcpListenerName = SERVER_LISTENER_TEMPLATE.replaceAll("%s", serverHostName); + return new XdsTestControlPlaneService.XdsTestControlPlaneConfig( + Collections.singletonList(serverListener(tcpListenerName, serverHostName)), + Collections.singletonList(clientListener(serverHostName)), + Collections.singletonList(rds(serverHostName)), + Collections.singletonList(cds()), + Collections.singletonList(eds(testServerPort)) + ); + } + + private void startControlPlane() throws Exception { + XdsTestControlPlaneService.XdsTestControlPlaneConfig controlPlaneConfig = + getControlPlaneConfig(); + logger.log(Level.FINER, "Starting control plane with config: {0}", controlPlaneConfig); + XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService( + controlPlaneConfig); + NettyServerBuilder controlPlaneServerBuilder = + NettyServerBuilder.forPort(controlPlaneServicePort) + .addService(controlPlaneService); + controlPlane = controlPlaneServerBuilder.build().start(); + } + + /** + * A subclass test case should override this method to verify end-to-end behaviour. + */ + abstract void run(); + + private static Listener clientListener(String name) { + HttpFilter httpFilter = HttpFilter.newBuilder() + .setName("terminal-filter") + .setTypedConfig(Any.pack(Router.newBuilder().build())) + .setIsOptional(true) + .build(); + ApiListener apiListener = ApiListener.newBuilder().setApiListener(Any.pack( + HttpConnectionManager.newBuilder() + .setRds( + Rds.newBuilder() + .setRouteConfigName(rdsName) + .setConfigSource( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance()))) + .addAllHttpFilters(Collections.singletonList(httpFilter)) + .build(), + HTTP_CONNECTION_MANAGER_TYPE_URL) + ).build(); + Listener listener = Listener.newBuilder() + .setName(name) + .setApiListener(apiListener).build(); + return listener; + } + + private static Listener serverListener(String name, String authority) { + HttpFilter routerFilter = HttpFilter.newBuilder() + .setName("terminal-filter") + .setTypedConfig( + Any.pack(Router.newBuilder().build())) + .setIsOptional(true) + .build(); + VirtualHost virtualHost = VirtualHost.newBuilder() + .setName("virtual-host-0") + .addDomains(authority) + .addRoutes( + Route.newBuilder() + .setMatch( + RouteMatch.newBuilder().setPrefix("/").build() + ) + .setNonForwardingAction(NonForwardingAction.newBuilder().build()) + .build() + ).build(); + RouteConfiguration routeConfig = RouteConfiguration.newBuilder() + .addVirtualHosts(virtualHost) + .build(); + Filter filter = Filter.newBuilder() + .setName("network-filter-0") + .setTypedConfig( + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(routeConfig) + .addAllHttpFilters(Collections.singletonList(routerFilter)) + .build() + ) + ).build(); + FilterChainMatch filterChainMatch = FilterChainMatch.newBuilder() + .setSourceType(FilterChainMatch.ConnectionSourceType.ANY) + .build(); + FilterChain filterChain = FilterChain.newBuilder() + .setName("filter-chain-0") + .setFilterChainMatch(filterChainMatch) + .addFilters(filter) + .build(); + return Listener.newBuilder() + .setName(name) + .setTrafficDirection(TrafficDirection.INBOUND) + .addFilterChains(filterChain) + .build(); + } + + private static RouteConfiguration rds(String authority) { + VirtualHost virtualHost = VirtualHost.newBuilder() + .addDomains(authority) + .addRoutes( + Route.newBuilder() + .setMatch( + RouteMatch.newBuilder().setPrefix("/").build() + ) + .setRoute( + RouteAction.newBuilder().setCluster(clusterName).build() + ) + .build()) + .build(); + return RouteConfiguration.newBuilder().setName(rdsName).addVirtualHosts(virtualHost).build(); + } + + private static Cluster cds() { + return Cluster.newBuilder() + .setName(clusterName) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig( + Cluster.EdsClusterConfig.newBuilder() + .setServiceName(edsName) + .setEdsConfig( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder().build()) + .build()) + .build() + ) + .setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN) + .build(); + } + + private static ClusterLoadAssignment eds(int port) { + Address address = Address.newBuilder() + .setSocketAddress( + SocketAddress.newBuilder().setAddress("0.0.0.0").setPortValue(port).build() + ) + .build(); + LocalityLbEndpoints endpoints = LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(10)) + .setPriority(0) + .addLbEndpoints( + LbEndpoint.newBuilder() + .setEndpoint( + Endpoint.newBuilder().setAddress(address).build()) + .setHealthStatus(HealthStatus.HEALTHY) + .build() + ) + .build(); + return ClusterLoadAssignment.newBuilder() + .setClusterName(edsName) + .addEndpoints(endpoints) + .build(); + } +} diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsInteropTest.java new file mode 100644 index 000000000000..410b65d37d98 --- /dev/null +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsInteropTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.ByteString; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class XdsInteropTest { + private static final Logger logger = Logger.getLogger(XdsInteropTest.class.getName()); + + /** + * The main application to run test cases. + */ + public static void main(String[] args) throws Exception { + AbstractXdsInteropTest testCase = new PingPong(); + testCase.setUp(); + try { + testCase.run(); + } finally { + testCase.tearDown(); + } + } + + private static class PingPong extends AbstractXdsInteropTest { + @Override + void run() { + Messages.SimpleRequest request = Messages.SimpleRequest.newBuilder() + .setResponseSize(3141) + .setPayload(Messages.Payload.newBuilder() + .setBody(ByteString.copyFrom(new byte[2728]))) + .build(); + Messages.SimpleResponse goldenResponse = Messages.SimpleResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder() + .setBody(ByteString.copyFrom(new byte[3141]))) + .build(); + assertEquals(goldenResponse.getPayload(), blockingStub.unaryCall(request).getPayload()); + logger.log(Level.INFO, "success"); + } + } +} diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestControlPlaneService.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestControlPlaneService.java new file mode 100644 index 000000000000..06a4d2467c94 --- /dev/null +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestControlPlaneService.java @@ -0,0 +1,269 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package io.grpc.testing.integration; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import io.grpc.SynchronizationContext; +import io.grpc.stub.StreamObserver; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.Listener; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.grpc.xds.shaded.io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; +import io.grpc.xds.shaded.io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.grpc.xds.shaded.io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class XdsTestControlPlaneService extends + AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase { + private static final Logger logger = Logger.getLogger(XdsInteropTest.class.getName()); + + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + logger.log(Level.SEVERE, "Exception!" + e); + } + }); + + private static final String ADS_TYPE_URL_LDS = + "type.googleapis.com/envoy.config.listener.v3.Listener"; + private static final String ADS_TYPE_URL_RDS = + "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"; + private static final String ADS_TYPE_URL_CDS = + "type.googleapis.com/envoy.config.cluster.v3.Cluster"; + private static final String ADS_TYPE_URL_EDS = + "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; + private final ImmutableMap ldsResources; + private final ImmutableMap rdsResources; + private final ImmutableMap cdsResources; + private final ImmutableMap edsResources; + private int ldsVersion = 1; + private int rdsVersion = 1; + private int cdsVersion = 1; + private int edsVersion = 1; + private int ldsNonce = 0; + private int rdsNonce = 0; + private int cdsNonce = 0; + private int edsNonce = 0; + + /** + * Create a control plane service for testing, with static xds configurations. + */ + public XdsTestControlPlaneService(XdsTestControlPlaneConfig config) { + Map ldsMap = new HashMap<>(); + for (Listener apiListener: config.apiListener) { + ldsMap.put(apiListener.getName(), apiListener); + } + for (Listener tcpListener: config.tcpListener) { + ldsMap.put(tcpListener.getName(), tcpListener); + } + this.ldsResources = ImmutableMap.copyOf(ldsMap); + + Map rdsMap = new HashMap<>(); + for (RouteConfiguration rds:config.rds) { + rdsMap.put(rds.getName(), rds); + } + this.rdsResources = ImmutableMap.copyOf(rdsMap); + + Map cdsMap = new HashMap<>(); + for (Cluster cds:config.cds) { + cdsMap.put(cds.getName(), cds); + } + this.cdsResources = ImmutableMap.copyOf(cdsMap); + + Map edsMap = new HashMap<>(); + for (ClusterLoadAssignment eds:config.eds) { + edsMap.put(eds.getClusterName(), eds); + } + this.edsResources = ImmutableMap.copyOf(edsMap); + logger.log(Level.FINER, "control plane config created. " + + "Dumping resources lds:{0},\nrds:{1},\ncds:{2},\neds:{3}", + new Object[]{ldsMap, rdsMap, cdsMap, edsMap}); + } + + public static class XdsTestControlPlaneConfig { + ImmutableList tcpListener; + ImmutableList apiListener; + ImmutableList rds; + ImmutableList cds; + ImmutableList eds; + + /** + * Provides control plane xds configurations. + */ + public XdsTestControlPlaneConfig(List tcpListener, + List apiListener, + List rds, + List cds, + List eds) { + this.tcpListener = ImmutableList.copyOf(tcpListener); + this.apiListener = ImmutableList.copyOf(apiListener); + this.rds = ImmutableList.copyOf(rds); + this.cds = ImmutableList.copyOf(cds); + this.eds = ImmutableList.copyOf(eds); + } + } + + @Override + public StreamObserver streamAggregatedResources( + final StreamObserver responseObserver) { + final StreamObserver requestObserver = + new StreamObserver() { + @Override + public void onNext(final DiscoveryRequest value) { + syncContext.execute(new Runnable() { + @Override + public void run() { + logger.log(Level.FINEST, "control plane received request {0}", value); + if (value.hasErrorDetail()) { + logger.log(Level.FINE, "control plane received nack resource {0}, error {1}", + new Object[]{value.getResourceNamesList(), value.getErrorDetail()}); + return; + } + if (value.getResourceNamesCount() <= 0) { + return; + } + switch (value.getTypeUrl()) { + case ADS_TYPE_URL_LDS: + if (!value.getResponseNonce().isEmpty() + && !String.valueOf(ldsNonce).equals(value.getResponseNonce())) { + logger.log(Level.FINE, "lds resource nonce does not match, ignore."); + return; + } + if (String.valueOf(ldsVersion).equals(value.getVersionInfo())) { + logger.log(Level.FINEST, "control plane received ack for lds resource: {0}", + value.getResourceNamesList()); + return; + } + DiscoveryResponse.Builder responseBuilder = DiscoveryResponse.newBuilder() + .setTypeUrl(ADS_TYPE_URL_LDS) + .setVersionInfo(String.valueOf(ldsVersion++)) + .setNonce(String.valueOf(++ldsNonce)); + for (String ldsName: value.getResourceNamesList()) { + if (ldsResources.containsKey(ldsName)) { + responseBuilder.addResources(Any.pack( + ldsResources.get(ldsName), + ADS_TYPE_URL_LDS + )); + } + } + responseObserver.onNext(responseBuilder.build()); + break; + case ADS_TYPE_URL_RDS: + if (!value.getResponseNonce().isEmpty() + && !String.valueOf(rdsNonce).equals(value.getResponseNonce())) { + logger.log(Level.FINE, "rds resource nonce does not match, ignore."); + return; + } + if (String.valueOf(rdsVersion).equals(value.getVersionInfo())) { + logger.log(Level.FINEST, "control plane received ack for rds resource: {0}", + value.getResourceNamesList()); + return; + } + responseBuilder = DiscoveryResponse.newBuilder() + .setTypeUrl(ADS_TYPE_URL_RDS) + .setVersionInfo(String.valueOf(rdsVersion++)) + .setNonce(String.valueOf(++rdsNonce)); + for (String rdsName: value.getResourceNamesList()) { + if (rdsResources.containsKey(rdsName)) { + responseBuilder.addResources(Any.pack( + rdsResources.get(rdsName), + ADS_TYPE_URL_RDS + )); + } + } + responseObserver.onNext(responseBuilder.build()); + break; + case ADS_TYPE_URL_CDS: + if (!value.getResponseNonce().isEmpty() + && !String.valueOf(cdsNonce).equals(value.getResponseNonce())) { + logger.log(Level.FINE, "cds resource nonce does not match, ignore."); + return; + } + if (String.valueOf(cdsVersion).equals(value.getVersionInfo())) { + logger.log(Level.FINEST, "control plane received ack for cds resource: {0}", + value.getResourceNamesList()); + return; + } + responseBuilder = DiscoveryResponse.newBuilder() + .setTypeUrl(ADS_TYPE_URL_CDS) + .setVersionInfo(String.valueOf(cdsVersion++)) + .setNonce(String.valueOf(++cdsNonce)); + for (String cdsName: value.getResourceNamesList()) { + if (cdsResources.containsKey(cdsName)) { + responseBuilder.addResources(Any.pack( + cdsResources.get(cdsName), + ADS_TYPE_URL_CDS + )); + } + } + responseObserver.onNext(responseBuilder.build()); + break; + case ADS_TYPE_URL_EDS: + if (!value.getResponseNonce().isEmpty() + && !String.valueOf(edsNonce).equals(value.getResponseNonce())) { + logger.log(Level.FINE, "eds resource nonce does not match, ignore."); + return; + } + if (String.valueOf(edsVersion).equals(value.getVersionInfo())) { + logger.log(Level.FINEST, "control plane received ack for eds resource: {0}", + value.getResourceNamesList()); + return; + } + responseBuilder = DiscoveryResponse.newBuilder() + .setTypeUrl(ADS_TYPE_URL_EDS) + .setVersionInfo(String.valueOf(edsVersion++)) + .setNonce(String.valueOf(++edsNonce)); + for (String edsName: value.getResourceNamesList()) { + if (edsResources.containsKey(edsName)) { + responseBuilder.addResources(Any.pack( + edsResources.get(value.getResourceNames(0)), + ADS_TYPE_URL_EDS + )); + } + } + responseObserver.onNext(responseBuilder.build()); + break; + default: + logger.log(Level.WARNING, "unrecognized typeUrl in discoveryRequest: {0}", + value.getTypeUrl()); + } + } + }); + } + + @Override + public void onError(Throwable t) { + logger.log(Level.FINE, "Control plane error: {0} ", t); + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + return requestObserver; + } +}