diff --git a/binder/src/androidTest/java/io/grpc/binder/BinderChannelSmokeTest.java b/binder/src/androidTest/java/io/grpc/binder/BinderChannelSmokeTest.java index dd9cf26bd3d0..41ea76146de2 100644 --- a/binder/src/androidTest/java/io/grpc/binder/BinderChannelSmokeTest.java +++ b/binder/src/androidTest/java/io/grpc/binder/BinderChannelSmokeTest.java @@ -36,11 +36,13 @@ import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.NameResolverRegistry; import io.grpc.Server; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptors; import io.grpc.ServerServiceDefinition; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.testing.FakeNameResolverProvider; import io.grpc.stub.ClientCalls; import io.grpc.stub.ServerCalls; import io.grpc.stub.StreamObserver; @@ -66,6 +68,7 @@ public final class BinderChannelSmokeTest { private static final int SLIGHTLY_MORE_THAN_ONE_BLOCK = 16 * 1024 + 100; private static final String MSG = "Some text which will be repeated many many times"; + private static final String SERVER_TARGET_URI = "fake://server"; final MethodDescriptor method = MethodDescriptor.newBuilder(StringMarshaller.INSTANCE, StringMarshaller.INSTANCE) @@ -85,7 +88,7 @@ public final class BinderChannelSmokeTest { .setType(MethodDescriptor.MethodType.BIDI_STREAMING) .build(); - AndroidComponentAddress serverAddress; + FakeNameResolverProvider fakeNameResolverProvider; ManagedChannel channel; AtomicReference headersCapture = new AtomicReference<>(); @@ -118,6 +121,8 @@ public void setUp() throws Exception { TestUtils.recordRequestHeadersInterceptor(headersCapture)); AndroidComponentAddress serverAddress = HostServices.allocateService(appContext); + fakeNameResolverProvider = new FakeNameResolverProvider(SERVER_TARGET_URI, serverAddress); + NameResolverRegistry.getDefaultRegistry().register(fakeNameResolverProvider); HostServices.configureService(serverAddress, HostServices.serviceParamsBuilder() .setServerFactory((service, receiver) -> @@ -132,6 +137,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { channel.shutdownNow(); + NameResolverRegistry.getDefaultRegistry().deregister(fakeNameResolverProvider); HostServices.awaitServiceShutdown(); } @@ -192,6 +198,12 @@ public void testStreamingCallOptionHeaders() throws Exception { assertThat(headersCapture.get().get(GrpcUtil.TIMEOUT_KEY)).isGreaterThan(0); } + @Test + public void testConnectViaTargetUri() throws Exception { + channel = BinderChannelBuilder.forTarget(SERVER_TARGET_URI, appContext).build(); + assertThat(doCall("Hello").get()).isEqualTo("Hello"); + } + private static String createLargeString(int size) { StringBuilder sb = new StringBuilder(); while (sb.length() < size) { diff --git a/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java b/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java index 99191cfad3c6..91e4e8f1c76a 100644 --- a/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java +++ b/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java @@ -67,13 +67,35 @@ public final class BinderChannelBuilder *

You the caller are responsible for managing the lifecycle of any channels built by the * resulting builder. They will not be shut down automatically. * - * @param targetAddress the {@link AndroidComponentAddress} referencing the service to bind to. + * @param directAddress the {@link AndroidComponentAddress} referencing the service to bind to. * @param sourceContext the context to bind from (e.g. The current Activity or Application). * @return a new builder */ public static BinderChannelBuilder forAddress( - AndroidComponentAddress targetAddress, Context sourceContext) { - return new BinderChannelBuilder(targetAddress, sourceContext); + AndroidComponentAddress directAddress, Context sourceContext) { + return new BinderChannelBuilder( + checkNotNull(directAddress, "directAddress"), null, sourceContext); + } + + /** + * Creates a channel builder that will bind to a remote Android service, via a string + * target name which will be resolved. + * + *

The underlying Android binding will be torn down when the channel becomes idle. This happens + * after 30 minutes without use by default but can be configured via {@link + * ManagedChannelBuilder#idleTimeout(long, TimeUnit)} or triggered manually with {@link + * ManagedChannel#enterIdle()}. + * + *

You the caller are responsible for managing the lifecycle of any channels built by the + * resulting builder. They will not be shut down automatically. + * + * @param target A target uri which should resolve into an {@link AndroidComponentAddress} + * referencing the service to bind to. + * @param sourceContext the context to bind from (e.g. The current Activity or Application). + * @return a new builder + */ + public static BinderChannelBuilder forTarget(String target, Context sourceContext) { + return new BinderChannelBuilder(null, checkNotNull(target, "target"), sourceContext); } /** @@ -88,7 +110,7 @@ public static BinderChannelBuilder forAddress(String name, int port) { /** * Always fails. Call {@link #forAddress(AndroidComponentAddress, Context)} instead. */ - @DoNotCall("Unsupported. Use forAddress(AndroidComponentAddress, Context) instead") + @DoNotCall("Unsupported. Use forTarget(String, Context) instead") public static BinderChannelBuilder forTarget(String target) { throw new UnsupportedOperationException( "call forAddress(AndroidComponentAddress, Context) instead"); @@ -104,9 +126,11 @@ public static BinderChannelBuilder forTarget(String target) { private BindServiceFlags bindServiceFlags; private BinderChannelBuilder( - AndroidComponentAddress targetAddress, + @Nullable AndroidComponentAddress directAddress, + @Nullable String target, Context sourceContext) { - mainThreadExecutor = ContextCompat.getMainExecutor(sourceContext); + mainThreadExecutor = + ContextCompat.getMainExecutor(checkNotNull(sourceContext, "sourceContext")); securityPolicy = SecurityPolicies.internalOnly(); inboundParcelablePolicy = InboundParcelablePolicy.DEFAULT; bindServiceFlags = BindServiceFlags.DEFAULTS; @@ -126,12 +150,20 @@ public ClientTransportFactory buildClientTransportFactory() { } } - managedChannelImplBuilder = - new ManagedChannelImplBuilder( - targetAddress, - targetAddress.getAuthority(), - new BinderChannelTransportFactoryBuilder(), - null); + if (directAddress != null) { + managedChannelImplBuilder = + new ManagedChannelImplBuilder( + directAddress, + directAddress.getAuthority(), + new BinderChannelTransportFactoryBuilder(), + null); + } else { + managedChannelImplBuilder = + new ManagedChannelImplBuilder( + target, + new BinderChannelTransportFactoryBuilder(), + null); + } } @Override diff --git a/binder/src/main/java/io/grpc/binder/SecurityPolicies.java b/binder/src/main/java/io/grpc/binder/SecurityPolicies.java index be46b9e3e54c..dcf36be00ca6 100644 --- a/binder/src/main/java/io/grpc/binder/SecurityPolicies.java +++ b/binder/src/main/java/io/grpc/binder/SecurityPolicies.java @@ -16,9 +16,20 @@ package io.grpc.binder; +import android.annotation.SuppressLint; +import android.content.pm.PackageInfo; +import android.content.pm.PackageManager; +import android.content.pm.PackageManager.NameNotFoundException; +import android.content.pm.Signature; +import android.os.Build; import android.os.Process; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import io.grpc.ExperimentalApi; import io.grpc.Status; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; import javax.annotation.CheckReturnValue; /** Static factory methods for creating standard security policies. */ @@ -55,4 +66,125 @@ public Status checkAuthorization(int uid) { } }; } + + /** + * Creates a {@link SecurityPolicy} which checks if the package signature + * matches {@code requiredSignature}. + * + * @param packageName the package name of the allowed package. + * @param requiredSignature the allowed signature of the allowed package. + * @throws NullPointerException if any of the inputs are {@code null}. + */ + public static SecurityPolicy hasSignature( + PackageManager packageManager, String packageName, Signature requiredSignature) { + return oneOfSignatures( + packageManager, packageName, ImmutableList.of(requiredSignature)); + } + + /** + * Creates a {@link SecurityPolicy} which checks if the package signature + * matches any of {@code requiredSignatures}. + * + * @param packageName the package name of the allowed package. + * @param requiredSignatures the allowed signatures of the allowed package. + * @throws NullPointerException if any of the inputs are {@code null}. + * @throws IllegalArgumentException if {@code requiredSignatures} is empty. + */ + public static SecurityPolicy oneOfSignatures( + PackageManager packageManager, + String packageName, + Collection requiredSignatures) { + Preconditions.checkNotNull(packageManager, "packageManager"); + Preconditions.checkNotNull(packageName, "packageName"); + Preconditions.checkNotNull(requiredSignatures, "requiredSignatures"); + Preconditions.checkArgument(!requiredSignatures.isEmpty(), + "requiredSignatures"); + ImmutableList requiredSignaturesImmutable = ImmutableList.copyOf(requiredSignatures); + + for (Signature requiredSignature : requiredSignaturesImmutable) { + Preconditions.checkNotNull(requiredSignature); + } + + return new SecurityPolicy() { + @Override + public Status checkAuthorization(int uid) { + return checkUidSignature( + packageManager, uid, packageName, requiredSignaturesImmutable); + } + }; + } + + private static Status checkUidSignature( + PackageManager packageManager, + int uid, + String packageName, + ImmutableList requiredSignatures) { + String[] packages = packageManager.getPackagesForUid(uid); + if (packages == null) { + return Status.UNAUTHENTICATED.withDescription( + "Rejected by signature check security policy"); + } + boolean packageNameMatched = false; + for (String pkg : packages) { + if (!packageName.equals(pkg)) { + continue; + } + packageNameMatched = true; + if (checkPackageSignature(packageManager, pkg, requiredSignatures)) { + return Status.OK; + } + } + return Status.PERMISSION_DENIED.withDescription( + "Rejected by signature check security policy. Package name matched: " + + packageNameMatched); + } + + /** + * Checks if the signature of {@code packageName} matches one of the given signatures. + * + * @param packageName the package to be checked + * @param requiredSignatures list of signatures. + * @return {@code true} if {@code packageName} has a matching signature. + */ + @SuppressWarnings("deprecation") // For PackageInfo.signatures + @SuppressLint("PackageManagerGetSignatures") // We only allow 1 signature. + private static boolean checkPackageSignature( + PackageManager packageManager, + String packageName, + ImmutableList requiredSignatures) { + PackageInfo packageInfo; + try { + if (Build.VERSION.SDK_INT >= 28) { + packageInfo = + packageManager.getPackageInfo(packageName, PackageManager.GET_SIGNING_CERTIFICATES); + if (packageInfo.signingInfo == null) { + return false; + } + Signature[] signatures = + packageInfo.signingInfo.hasMultipleSigners() + ? packageInfo.signingInfo.getApkContentsSigners() + : packageInfo.signingInfo.getSigningCertificateHistory(); + + for (Signature signature : signatures) { + if (requiredSignatures.contains(signature)) { + return true; + } + } + } else { + packageInfo = packageManager.getPackageInfo(packageName, PackageManager.GET_SIGNATURES); + if (packageInfo.signatures == null || packageInfo.signatures.length != 1) { + // Reject multiply-signed apks because of b/13678484 + // (See PackageManagerGetSignatures supression above). + return false; + } + + if (requiredSignatures.contains(packageInfo.signatures[0])) { + return true; + } + } + } catch (NameNotFoundException nnfe) { + return false; + } + return false; + } } diff --git a/binder/src/main/java/io/grpc/binder/SecurityPolicy.java b/binder/src/main/java/io/grpc/binder/SecurityPolicy.java index d7dad53fdc83..d13f3a863fd2 100644 --- a/binder/src/main/java/io/grpc/binder/SecurityPolicy.java +++ b/binder/src/main/java/io/grpc/binder/SecurityPolicy.java @@ -23,6 +23,11 @@ /** * Decides whether a given Android UID is authorized to access some resource. * + * While it's possible to extend this class to define your own policy, it's strongly + * recommended that you only use the policies provided by the {@link SecurityPolicies} or + * {@link UntrustedSecurityPolicies} classes. Implementing your own security policy requires + * significant care, and an understanding of the details and pitfalls of Android security. + * *

IMPORTANT For any concrete extensions of this class, it's assumed that the * authorization status of a given UID will not change as long as a process with that UID is * alive. diff --git a/binder/src/main/java/io/grpc/binder/UntrustedSecurityPolicies.java b/binder/src/main/java/io/grpc/binder/UntrustedSecurityPolicies.java new file mode 100644 index 000000000000..7c842b025acb --- /dev/null +++ b/binder/src/main/java/io/grpc/binder/UntrustedSecurityPolicies.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.binder; + +import io.grpc.ExperimentalApi; +import io.grpc.Status; +import javax.annotation.CheckReturnValue; + +/** + * Static factory methods for creating untrusted security policies. + */ +@CheckReturnValue +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8022") +public final class UntrustedSecurityPolicies { + + private UntrustedSecurityPolicies() {} + + /** + * Return a security policy which allows any peer on device. + * Servers should only use this policy if they intend to expose + * a service to all applications on device. + * Clients should only use this policy if they don't need to trust the + * application they're connecting to. + */ + public static SecurityPolicy untrustedPublic() { + return new SecurityPolicy() { + @Override + public Status checkAuthorization(int uid) { + return Status.OK; + } + }; + } +} diff --git a/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java b/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java index 6fd9e22ebaac..86edb5ad7df2 100644 --- a/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java +++ b/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java @@ -17,22 +17,64 @@ package io.grpc.binder; import static com.google.common.truth.Truth.assertThat; +import static org.robolectric.Shadows.shadowOf; +import android.content.Context; +import android.content.pm.PackageInfo; +import android.content.pm.PackageManager; +import android.content.pm.Signature; import android.os.Process; +import androidx.test.core.app.ApplicationProvider; +import com.google.common.collect.ImmutableList; import io.grpc.Status; +import io.grpc.binder.SecurityPolicy; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.robolectric.RobolectricTestRunner; @RunWith(RobolectricTestRunner.class) public final class SecurityPoliciesTest { + private static final int MY_UID = Process.myUid(); private static final int OTHER_UID = MY_UID + 1; + private static final int OTHER_UID_SAME_SIGNATURE = MY_UID + 2; + private static final int OTHER_UID_NO_SIGNATURE = MY_UID + 3; + private static final int OTHER_UID_UNKNOWN = MY_UID + 4; private static final String PERMISSION_DENIED_REASONS = "some reasons"; + private static final Signature SIG1 = new Signature("1234"); + private static final Signature SIG2 = new Signature("4321"); + + private static final String OTHER_UID_PACKAGE_NAME = "other.package"; + private static final String OTHER_UID_SAME_SIGNATURE_PACKAGE_NAME = "other.package.samesignature"; + private static final String OTHER_UID_NO_SIGNATURE_PACKAGE_NAME = "other.package.nosignature"; + + private Context appContext; + private PackageManager packageManager; + private SecurityPolicy policy; + @Before + public void setUp() { + appContext = ApplicationProvider.getApplicationContext(); + packageManager = appContext.getPackageManager(); + installPackage(MY_UID, appContext.getPackageName(), SIG1); + installPackage(OTHER_UID, OTHER_UID_PACKAGE_NAME, SIG2); + installPackage(OTHER_UID_SAME_SIGNATURE, OTHER_UID_SAME_SIGNATURE_PACKAGE_NAME, SIG1); + installPackage(OTHER_UID_NO_SIGNATURE, OTHER_UID_NO_SIGNATURE_PACKAGE_NAME); + } + + @SuppressWarnings("deprecation") + private void installPackage(int uid, String packageName, Signature... signatures) { + PackageInfo info = new PackageInfo(); + info.packageName = packageName; + info.signatures = signatures; + shadowOf(packageManager).installPackage(info); + shadowOf(packageManager).setPackagesForUid(uid, packageName); + } + @Test public void testInternalOnly() throws Exception { policy = SecurityPolicies.internalOnly(); @@ -53,4 +95,80 @@ public void testPermissionDenied() throws Exception { assertThat(policy.checkAuthorization(OTHER_UID).getDescription()) .isEqualTo(PERMISSION_DENIED_REASONS); } + + @Test + public void testHasSignature_succeedsIfPackageNameAndSignaturesMatch() + throws Exception { + policy = SecurityPolicies.hasSignature(packageManager, OTHER_UID_PACKAGE_NAME, SIG2); + + // THEN UID for package that has SIG2 will be authorized + assertThat(policy.checkAuthorization(OTHER_UID).getCode()).isEqualTo(Status.OK.getCode()); + } + + @Test + public void testHasSignature_failsIfPackageNameDoesNotMatch() throws Exception { + policy = SecurityPolicies.hasSignature(packageManager, appContext.getPackageName(), SIG1); + + // THEN UID for package that has SIG1 but different package name will not be authorized + assertThat(policy.checkAuthorization(OTHER_UID_SAME_SIGNATURE).getCode()) + .isEqualTo(Status.PERMISSION_DENIED.getCode()); + } + + @Test + public void testHasSignature_failsIfSignatureDoesNotMatch() throws Exception { + policy = SecurityPolicies.hasSignature(packageManager, OTHER_UID_PACKAGE_NAME, SIG1); + + // THEN UID for package that doesn't have SIG1 will not be authorized + assertThat(policy.checkAuthorization(OTHER_UID).getCode()) + .isEqualTo(Status.PERMISSION_DENIED.getCode()); + } + + @Test + public void testOneOfSignatures_succeedsIfPackageNameAndSignaturesMatch() + throws Exception { + policy = + SecurityPolicies.oneOfSignatures( + packageManager, OTHER_UID_PACKAGE_NAME, ImmutableList.of(SIG2)); + + // THEN UID for package that has SIG2 will be authorized + assertThat(policy.checkAuthorization(OTHER_UID).getCode()).isEqualTo(Status.OK.getCode()); + } + + @Test + public void testOneOfSignature_failsIfAllSignaturesDoNotMatch() throws Exception { + policy = + SecurityPolicies.oneOfSignatures( + packageManager, + appContext.getPackageName(), + ImmutableList.of(SIG1, new Signature("1314"))); + + // THEN UID for package that has SIG1 but different package name will not be authorized + assertThat(policy.checkAuthorization(OTHER_UID_SAME_SIGNATURE).getCode()) + .isEqualTo(Status.PERMISSION_DENIED.getCode()); + } + + @Test + public void testOneOfSignature_succeedsIfPackageNameAndOneOfSignaturesMatch() + throws Exception { + policy = + SecurityPolicies.oneOfSignatures( + packageManager, + OTHER_UID_PACKAGE_NAME, + ImmutableList.of(SIG1, SIG2)); + + // THEN UID for package that has SIG2 will be authorized + assertThat(policy.checkAuthorization(OTHER_UID).getCode()).isEqualTo(Status.OK.getCode()); + } + + @Test + public void testHasSignature_failsIfUidUnknown() throws Exception { + policy = + SecurityPolicies.hasSignature( + packageManager, + appContext.getPackageName(), + SIG1); + + assertThat(policy.checkAuthorization(OTHER_UID_UNKNOWN).getCode()) + .isEqualTo(Status.UNAUTHENTICATED.getCode()); + } } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 8607d3996a5d..1eebaa63a8e8 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -287,8 +287,9 @@ void handleAddresses( cancelLbRpcRetryTimer(); startLbRpc(); } - // Start the fallback timer if it's never started - if (fallbackTimer == null) { + // Start the fallback timer if it's never started and we are not already using fallback + // backends. + if (fallbackTimer == null && !usingFallbackBackends) { fallbackTimer = syncContext.schedule( new FallbackModeTask(BALANCER_TIMEOUT_STATUS), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS, timerService); diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index cb231c6c055f..293c0aa0b82a 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1462,6 +1462,33 @@ public void grpclbFallback_noBalancerAddress() { .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); } + /** + * A test for a situation where we first only get backend addresses resolved and then in a + * later name resolution get both backend and load balancer addresses. The first instance + * will switch us to using fallback backends and it is important that in the second instance + * we do not start a fallback timer as it will fail when it triggers if the fallback backends + * are already in use. + */ + @Test + public void grpclbFallback_noTimerWhenAlreadyInFallback() { + // Initially we only get backend addresses without any LB ones. This should get us to use + // fallback backends from the start as we won't be able to even talk to the load balancer. + // No fallback timer would be started as we already started to use fallback backends. + deliverResolvedAddresses(createResolvedBalancerAddresses(1), + Collections.emptyList()); + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + + // Later a new name resolution call happens and we get both backend and LB addresses. Since we + // are already operating with fallback backends a fallback timer should not be started to move + // us to fallback mode. + deliverResolvedAddresses(Collections.emptyList(), + createResolvedBalancerAddresses(1)); + + // If a fallback timer is started it will eventually throw an exception when it tries to switch + // us to using fallback backends when we already are using them. + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + } + @Test public void grpclbFallback_balancerLost() { subtestGrpclbFallbackConnectionLost(true, false); diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index 944c0daab811..ef7510c17238 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -147,6 +147,13 @@ task xds_test_server(type: CreateStartScripts) { classpath = startScripts.classpath } +task xds_e2e_client(type: CreateStartScripts) { + mainClassName = "io.grpc.testing.integration.XdsInteropTest" + applicationName = "xds-e2e-test-client" + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + applicationDistribution.into("bin") { from(test_client) from(test_server) @@ -157,6 +164,7 @@ applicationDistribution.into("bin") { from(grpclb_fallback_test_client) from(xds_test_client) from(xds_test_server) + from(xds_e2e_client) fileMode = 0755 } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractXdsInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractXdsInteropTest.java new file mode 100644 index 000000000000..0bd2318b0fc5 --- /dev/null +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractXdsInteropTest.java @@ -0,0 +1,337 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.UInt32Value; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.InsecureServerCredentials; +import io.grpc.ManagedChannel; +import io.grpc.NameResolverRegistry; +import io.grpc.Server; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.xds.XdsNameResolverProvider; +import io.grpc.xds.XdsServerBuilder; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.Address; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.core.v3.TrafficDirection; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.ApiListener; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.Filter; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.FilterChain; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.FilterChainMatch; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.Listener; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.NonForwardingAction; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.Route; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.RouteAction; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.RouteMatch; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.VirtualHost; +import io.grpc.xds.shaded.io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; +import io.grpc.xds.shaded.io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; +import io.grpc.xds.shaded.io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; +import io.grpc.xds.shaded.io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Abstract base class for end-to-end xds tests. + * A local control plane is implemented in {@link XdsTestControlPlaneService}. + * Test cases can inject xds configs to the control plane for testing. + */ +public abstract class AbstractXdsInteropTest { + private static final Logger logger = Logger.getLogger(AbstractXdsInteropTest.class.getName()); + + protected static final int testServerPort = 8080; + private static final int controlPlaneServicePort = 443; + private Server server; + private Server controlPlane; + protected TestServiceGrpc.TestServiceBlockingStub blockingStub; + private ScheduledExecutorService executor; + private XdsNameResolverProvider nameResolverProvider; + private static final String scheme = "test-xds"; + private static final String serverHostName = "0.0.0.0:" + testServerPort; + private static final String SERVER_LISTENER_TEMPLATE = + "grpc/server?udpa.resource.listening_address=%s"; + private static final String rdsName = "route-config.googleapis.com"; + private static final String clusterName = "cluster0"; + private static final String edsName = "eds-service-0"; + private static final String HTTP_CONNECTION_MANAGER_TYPE_URL = + "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3" + + ".HttpConnectionManager"; + + private static final Map defaultClientBootstrapOverride = ImmutableMap.of( + "node", ImmutableMap.of( + "id", UUID.randomUUID().toString(), + "cluster", "cluster0"), + "xds_servers", Collections.singletonList( + ImmutableMap.of( + "server_uri", "localhost:" + controlPlaneServicePort, + "channel_creds", Collections.singletonList( + ImmutableMap.of("type", "insecure") + ), + "server_features", Collections.singletonList("xds_v3") + ) + ) + ); + + /** + * Provides default client bootstrap. + * A subclass test case should override this method if it tests client bootstrap. + */ + protected Map getClientBootstrapOverride() { + return defaultClientBootstrapOverride; + } + + private static final Map defaultServerBootstrapOverride = ImmutableMap.of( + "node", ImmutableMap.of( + "id", UUID.randomUUID().toString()), + "xds_servers", Collections.singletonList( + ImmutableMap.of( + "server_uri", "localhost:" + controlPlaneServicePort, + "channel_creds", Collections.singletonList( + ImmutableMap.of("type", "insecure") + ), + "server_features", Collections.singletonList("xds_v3") + ) + ), + "server_listener_resource_name_template", SERVER_LISTENER_TEMPLATE + ); + + /** + * Provides default server bootstrap. + * A subclass test case should override this method if it tests server bootstrap. + */ + protected Map getServerBootstrapOverride() { + return defaultServerBootstrapOverride; + } + + protected void setUp() throws Exception { + startControlPlane(); + startServer(); + nameResolverProvider = XdsNameResolverProvider.createForTest(scheme, + getClientBootstrapOverride()); + NameResolverRegistry.getDefaultRegistry().register(nameResolverProvider); + ManagedChannel channel = Grpc.newChannelBuilder(scheme + ":///" + serverHostName, + InsecureChannelCredentials.create()).build(); + blockingStub = TestServiceGrpc.newBlockingStub(channel); + } + + protected void tearDown() throws Exception { + if (server != null) { + server.shutdownNow(); + if (!server.awaitTermination(5, TimeUnit.SECONDS)) { + logger.log(Level.SEVERE, "Timed out waiting for server shutdown"); + } + } + if (controlPlane != null) { + controlPlane.shutdownNow(); + if (!controlPlane.awaitTermination(5, TimeUnit.SECONDS)) { + logger.log(Level.SEVERE, "Timed out waiting for server shutdown"); + } + } + if (executor != null) { + MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS); + } + NameResolverRegistry.getDefaultRegistry().deregister(nameResolverProvider); + } + + protected void startServer() throws Exception { + executor = Executors.newSingleThreadScheduledExecutor(); + XdsServerBuilder serverBuilder = XdsServerBuilder.forPort( + testServerPort, InsecureServerCredentials.create()) + .addService(new TestServiceImpl(executor)) + .overrideBootstrapForTest(getServerBootstrapOverride()); + server = serverBuilder.build().start(); + } + + /** + * Provides default control plane xds configs. + * A subclass test case should override this method to inject control plane xds configs to verify + * end-to-end behavior. + */ + protected XdsTestControlPlaneService.XdsTestControlPlaneConfig getControlPlaneConfig() { + String tcpListenerName = SERVER_LISTENER_TEMPLATE.replaceAll("%s", serverHostName); + return new XdsTestControlPlaneService.XdsTestControlPlaneConfig( + Collections.singletonList(serverListener(tcpListenerName, serverHostName)), + Collections.singletonList(clientListener(serverHostName)), + Collections.singletonList(rds(serverHostName)), + Collections.singletonList(cds()), + Collections.singletonList(eds(testServerPort)) + ); + } + + private void startControlPlane() throws Exception { + XdsTestControlPlaneService.XdsTestControlPlaneConfig controlPlaneConfig = + getControlPlaneConfig(); + logger.log(Level.FINER, "Starting control plane with config: {0}", controlPlaneConfig); + XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService( + controlPlaneConfig); + NettyServerBuilder controlPlaneServerBuilder = + NettyServerBuilder.forPort(controlPlaneServicePort) + .addService(controlPlaneService); + controlPlane = controlPlaneServerBuilder.build().start(); + } + + /** + * A subclass test case should override this method to verify end-to-end behaviour. + */ + abstract void run(); + + private static Listener clientListener(String name) { + HttpFilter httpFilter = HttpFilter.newBuilder() + .setName("terminal-filter") + .setTypedConfig(Any.pack(Router.newBuilder().build())) + .setIsOptional(true) + .build(); + ApiListener apiListener = ApiListener.newBuilder().setApiListener(Any.pack( + HttpConnectionManager.newBuilder() + .setRds( + Rds.newBuilder() + .setRouteConfigName(rdsName) + .setConfigSource( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance()))) + .addAllHttpFilters(Collections.singletonList(httpFilter)) + .build(), + HTTP_CONNECTION_MANAGER_TYPE_URL) + ).build(); + Listener listener = Listener.newBuilder() + .setName(name) + .setApiListener(apiListener).build(); + return listener; + } + + private static Listener serverListener(String name, String authority) { + HttpFilter routerFilter = HttpFilter.newBuilder() + .setName("terminal-filter") + .setTypedConfig( + Any.pack(Router.newBuilder().build())) + .setIsOptional(true) + .build(); + VirtualHost virtualHost = VirtualHost.newBuilder() + .setName("virtual-host-0") + .addDomains(authority) + .addRoutes( + Route.newBuilder() + .setMatch( + RouteMatch.newBuilder().setPrefix("/").build() + ) + .setNonForwardingAction(NonForwardingAction.newBuilder().build()) + .build() + ).build(); + RouteConfiguration routeConfig = RouteConfiguration.newBuilder() + .addVirtualHosts(virtualHost) + .build(); + Filter filter = Filter.newBuilder() + .setName("network-filter-0") + .setTypedConfig( + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(routeConfig) + .addAllHttpFilters(Collections.singletonList(routerFilter)) + .build() + ) + ).build(); + FilterChainMatch filterChainMatch = FilterChainMatch.newBuilder() + .setSourceType(FilterChainMatch.ConnectionSourceType.ANY) + .build(); + FilterChain filterChain = FilterChain.newBuilder() + .setName("filter-chain-0") + .setFilterChainMatch(filterChainMatch) + .addFilters(filter) + .build(); + return Listener.newBuilder() + .setName(name) + .setTrafficDirection(TrafficDirection.INBOUND) + .addFilterChains(filterChain) + .build(); + } + + private static RouteConfiguration rds(String authority) { + VirtualHost virtualHost = VirtualHost.newBuilder() + .addDomains(authority) + .addRoutes( + Route.newBuilder() + .setMatch( + RouteMatch.newBuilder().setPrefix("/").build() + ) + .setRoute( + RouteAction.newBuilder().setCluster(clusterName).build() + ) + .build()) + .build(); + return RouteConfiguration.newBuilder().setName(rdsName).addVirtualHosts(virtualHost).build(); + } + + private static Cluster cds() { + return Cluster.newBuilder() + .setName(clusterName) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig( + Cluster.EdsClusterConfig.newBuilder() + .setServiceName(edsName) + .setEdsConfig( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder().build()) + .build()) + .build() + ) + .setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN) + .build(); + } + + private static ClusterLoadAssignment eds(int port) { + Address address = Address.newBuilder() + .setSocketAddress( + SocketAddress.newBuilder().setAddress("0.0.0.0").setPortValue(port).build() + ) + .build(); + LocalityLbEndpoints endpoints = LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(10)) + .setPriority(0) + .addLbEndpoints( + LbEndpoint.newBuilder() + .setEndpoint( + Endpoint.newBuilder().setAddress(address).build()) + .setHealthStatus(HealthStatus.HEALTHY) + .build() + ) + .build(); + return ClusterLoadAssignment.newBuilder() + .setClusterName(edsName) + .addEndpoints(endpoints) + .build(); + } +} diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsInteropTest.java new file mode 100644 index 000000000000..410b65d37d98 --- /dev/null +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsInteropTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.ByteString; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class XdsInteropTest { + private static final Logger logger = Logger.getLogger(XdsInteropTest.class.getName()); + + /** + * The main application to run test cases. + */ + public static void main(String[] args) throws Exception { + AbstractXdsInteropTest testCase = new PingPong(); + testCase.setUp(); + try { + testCase.run(); + } finally { + testCase.tearDown(); + } + } + + private static class PingPong extends AbstractXdsInteropTest { + @Override + void run() { + Messages.SimpleRequest request = Messages.SimpleRequest.newBuilder() + .setResponseSize(3141) + .setPayload(Messages.Payload.newBuilder() + .setBody(ByteString.copyFrom(new byte[2728]))) + .build(); + Messages.SimpleResponse goldenResponse = Messages.SimpleResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder() + .setBody(ByteString.copyFrom(new byte[3141]))) + .build(); + assertEquals(goldenResponse.getPayload(), blockingStub.unaryCall(request).getPayload()); + logger.log(Level.INFO, "success"); + } + } +} diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestControlPlaneService.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestControlPlaneService.java new file mode 100644 index 000000000000..06a4d2467c94 --- /dev/null +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestControlPlaneService.java @@ -0,0 +1,269 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package io.grpc.testing.integration; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import io.grpc.SynchronizationContext; +import io.grpc.stub.StreamObserver; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.listener.v3.Listener; +import io.grpc.xds.shaded.io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.grpc.xds.shaded.io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; +import io.grpc.xds.shaded.io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.grpc.xds.shaded.io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class XdsTestControlPlaneService extends + AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase { + private static final Logger logger = Logger.getLogger(XdsInteropTest.class.getName()); + + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + logger.log(Level.SEVERE, "Exception!" + e); + } + }); + + private static final String ADS_TYPE_URL_LDS = + "type.googleapis.com/envoy.config.listener.v3.Listener"; + private static final String ADS_TYPE_URL_RDS = + "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"; + private static final String ADS_TYPE_URL_CDS = + "type.googleapis.com/envoy.config.cluster.v3.Cluster"; + private static final String ADS_TYPE_URL_EDS = + "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; + private final ImmutableMap ldsResources; + private final ImmutableMap rdsResources; + private final ImmutableMap cdsResources; + private final ImmutableMap edsResources; + private int ldsVersion = 1; + private int rdsVersion = 1; + private int cdsVersion = 1; + private int edsVersion = 1; + private int ldsNonce = 0; + private int rdsNonce = 0; + private int cdsNonce = 0; + private int edsNonce = 0; + + /** + * Create a control plane service for testing, with static xds configurations. + */ + public XdsTestControlPlaneService(XdsTestControlPlaneConfig config) { + Map ldsMap = new HashMap<>(); + for (Listener apiListener: config.apiListener) { + ldsMap.put(apiListener.getName(), apiListener); + } + for (Listener tcpListener: config.tcpListener) { + ldsMap.put(tcpListener.getName(), tcpListener); + } + this.ldsResources = ImmutableMap.copyOf(ldsMap); + + Map rdsMap = new HashMap<>(); + for (RouteConfiguration rds:config.rds) { + rdsMap.put(rds.getName(), rds); + } + this.rdsResources = ImmutableMap.copyOf(rdsMap); + + Map cdsMap = new HashMap<>(); + for (Cluster cds:config.cds) { + cdsMap.put(cds.getName(), cds); + } + this.cdsResources = ImmutableMap.copyOf(cdsMap); + + Map edsMap = new HashMap<>(); + for (ClusterLoadAssignment eds:config.eds) { + edsMap.put(eds.getClusterName(), eds); + } + this.edsResources = ImmutableMap.copyOf(edsMap); + logger.log(Level.FINER, "control plane config created. " + + "Dumping resources lds:{0},\nrds:{1},\ncds:{2},\neds:{3}", + new Object[]{ldsMap, rdsMap, cdsMap, edsMap}); + } + + public static class XdsTestControlPlaneConfig { + ImmutableList tcpListener; + ImmutableList apiListener; + ImmutableList rds; + ImmutableList cds; + ImmutableList eds; + + /** + * Provides control plane xds configurations. + */ + public XdsTestControlPlaneConfig(List tcpListener, + List apiListener, + List rds, + List cds, + List eds) { + this.tcpListener = ImmutableList.copyOf(tcpListener); + this.apiListener = ImmutableList.copyOf(apiListener); + this.rds = ImmutableList.copyOf(rds); + this.cds = ImmutableList.copyOf(cds); + this.eds = ImmutableList.copyOf(eds); + } + } + + @Override + public StreamObserver streamAggregatedResources( + final StreamObserver responseObserver) { + final StreamObserver requestObserver = + new StreamObserver() { + @Override + public void onNext(final DiscoveryRequest value) { + syncContext.execute(new Runnable() { + @Override + public void run() { + logger.log(Level.FINEST, "control plane received request {0}", value); + if (value.hasErrorDetail()) { + logger.log(Level.FINE, "control plane received nack resource {0}, error {1}", + new Object[]{value.getResourceNamesList(), value.getErrorDetail()}); + return; + } + if (value.getResourceNamesCount() <= 0) { + return; + } + switch (value.getTypeUrl()) { + case ADS_TYPE_URL_LDS: + if (!value.getResponseNonce().isEmpty() + && !String.valueOf(ldsNonce).equals(value.getResponseNonce())) { + logger.log(Level.FINE, "lds resource nonce does not match, ignore."); + return; + } + if (String.valueOf(ldsVersion).equals(value.getVersionInfo())) { + logger.log(Level.FINEST, "control plane received ack for lds resource: {0}", + value.getResourceNamesList()); + return; + } + DiscoveryResponse.Builder responseBuilder = DiscoveryResponse.newBuilder() + .setTypeUrl(ADS_TYPE_URL_LDS) + .setVersionInfo(String.valueOf(ldsVersion++)) + .setNonce(String.valueOf(++ldsNonce)); + for (String ldsName: value.getResourceNamesList()) { + if (ldsResources.containsKey(ldsName)) { + responseBuilder.addResources(Any.pack( + ldsResources.get(ldsName), + ADS_TYPE_URL_LDS + )); + } + } + responseObserver.onNext(responseBuilder.build()); + break; + case ADS_TYPE_URL_RDS: + if (!value.getResponseNonce().isEmpty() + && !String.valueOf(rdsNonce).equals(value.getResponseNonce())) { + logger.log(Level.FINE, "rds resource nonce does not match, ignore."); + return; + } + if (String.valueOf(rdsVersion).equals(value.getVersionInfo())) { + logger.log(Level.FINEST, "control plane received ack for rds resource: {0}", + value.getResourceNamesList()); + return; + } + responseBuilder = DiscoveryResponse.newBuilder() + .setTypeUrl(ADS_TYPE_URL_RDS) + .setVersionInfo(String.valueOf(rdsVersion++)) + .setNonce(String.valueOf(++rdsNonce)); + for (String rdsName: value.getResourceNamesList()) { + if (rdsResources.containsKey(rdsName)) { + responseBuilder.addResources(Any.pack( + rdsResources.get(rdsName), + ADS_TYPE_URL_RDS + )); + } + } + responseObserver.onNext(responseBuilder.build()); + break; + case ADS_TYPE_URL_CDS: + if (!value.getResponseNonce().isEmpty() + && !String.valueOf(cdsNonce).equals(value.getResponseNonce())) { + logger.log(Level.FINE, "cds resource nonce does not match, ignore."); + return; + } + if (String.valueOf(cdsVersion).equals(value.getVersionInfo())) { + logger.log(Level.FINEST, "control plane received ack for cds resource: {0}", + value.getResourceNamesList()); + return; + } + responseBuilder = DiscoveryResponse.newBuilder() + .setTypeUrl(ADS_TYPE_URL_CDS) + .setVersionInfo(String.valueOf(cdsVersion++)) + .setNonce(String.valueOf(++cdsNonce)); + for (String cdsName: value.getResourceNamesList()) { + if (cdsResources.containsKey(cdsName)) { + responseBuilder.addResources(Any.pack( + cdsResources.get(cdsName), + ADS_TYPE_URL_CDS + )); + } + } + responseObserver.onNext(responseBuilder.build()); + break; + case ADS_TYPE_URL_EDS: + if (!value.getResponseNonce().isEmpty() + && !String.valueOf(edsNonce).equals(value.getResponseNonce())) { + logger.log(Level.FINE, "eds resource nonce does not match, ignore."); + return; + } + if (String.valueOf(edsVersion).equals(value.getVersionInfo())) { + logger.log(Level.FINEST, "control plane received ack for eds resource: {0}", + value.getResourceNamesList()); + return; + } + responseBuilder = DiscoveryResponse.newBuilder() + .setTypeUrl(ADS_TYPE_URL_EDS) + .setVersionInfo(String.valueOf(edsVersion++)) + .setNonce(String.valueOf(++edsNonce)); + for (String edsName: value.getResourceNamesList()) { + if (edsResources.containsKey(edsName)) { + responseBuilder.addResources(Any.pack( + edsResources.get(value.getResourceNames(0)), + ADS_TYPE_URL_EDS + )); + } + } + responseObserver.onNext(responseBuilder.build()); + break; + default: + logger.log(Level.WARNING, "unrecognized typeUrl in discoveryRequest: {0}", + value.getTypeUrl()); + } + } + }); + } + + @Override + public void onError(Throwable t) { + logger.log(Level.FINE, "Control plane error: {0} ", t); + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + return requestObserver; + } +} diff --git a/rls/src/main/proto/grpc/lookup/v1/rls.proto b/rls/src/main/proto/grpc/lookup/v1/rls.proto index d9dd6c246f24..7d1735289d56 100644 --- a/rls/src/main/proto/grpc/lookup/v1/rls.proto +++ b/rls/src/main/proto/grpc/lookup/v1/rls.proto @@ -22,14 +22,6 @@ option java_package = "io.grpc.lookup.v1"; option java_outer_classname = "RlsProto"; message RouteLookupRequest { - // Full host name of the target server, e.g. firestore.googleapis.com. - // Only set for gRPC requests; HTTP requests must use key_map explicitly. - // Deprecated in favor of setting key_map keys with GrpcKeyBuilder.extra_keys. - string server = 1 [deprecated = true]; - // Full path of the request, i.e. "/service/method". - // Only set for gRPC requests; HTTP requests must use key_map explicitly. - // Deprecated in favor of setting key_map keys with GrpcKeyBuilder.extra_keys. - string path = 2 [deprecated = true]; // Target type allows the client to specify what kind of target format it // would like from RLS to allow it to find the regional server, e.g. "grpc". string target_type = 3; @@ -41,8 +33,13 @@ message RouteLookupRequest { } // Reason for making this request. Reason reason = 5; + // For REASON_STALE, the header_data from the stale response, if any. + string stale_header_data = 6; // Map of key values extracted via key builders for the gRPC or HTTP request. map key_map = 4; + + reserved 1, 2; + reserved "server", "path"; } message RouteLookupResponse { diff --git a/rls/src/main/proto/grpc/lookup/v1/rls_config.proto b/rls/src/main/proto/grpc/lookup/v1/rls_config.proto index db99a8949ea9..9d2b6c54cfb4 100644 --- a/rls/src/main/proto/grpc/lookup/v1/rls_config.proto +++ b/rls/src/main/proto/grpc/lookup/v1/rls_config.proto @@ -216,3 +216,10 @@ message RouteLookupConfig { reserved 10; reserved "request_processing_strategy"; } + +// RouteLookupClusterSpecifier is used in xDS to represent a cluster specifier +// plugin for RLS. +message RouteLookupClusterSpecifier { + // The RLS config for this cluster specifier plugin instance. + RouteLookupConfig route_lookup_config = 1; +} diff --git a/testing/src/main/java/io/grpc/internal/testing/FakeNameResolverProvider.java b/testing/src/main/java/io/grpc/internal/testing/FakeNameResolverProvider.java new file mode 100644 index 000000000000..d056707b7196 --- /dev/null +++ b/testing/src/main/java/io/grpc/internal/testing/FakeNameResolverProvider.java @@ -0,0 +1,94 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal.testing; + +import com.google.common.collect.ImmutableList; +import io.grpc.EquivalentAddressGroup; +import io.grpc.NameResolver; +import io.grpc.NameResolverProvider; +import io.grpc.Status; +import java.net.SocketAddress; +import java.net.URI; + +/** A name resolver to always resolve the given URI into the given address. */ +public final class FakeNameResolverProvider extends NameResolverProvider { + + private final URI targetUri; + private final SocketAddress address; + + public FakeNameResolverProvider(String targetUri, SocketAddress address) { + this.targetUri = URI.create(targetUri); + this.address = address; + } + + @Override + public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { + if (targetUri.equals(this.targetUri)) { + return new FakeNameResolver(address); + } + return null; + } + + @Override + protected boolean isAvailable() { + return true; + } + + @Override + protected int priority() { + return 5; // Default + } + + @Override + public String getDefaultScheme() { + return targetUri.getScheme(); + } + + /** A single name resolver. */ + private static final class FakeNameResolver extends NameResolver { + private static final String AUTHORITY = "fake-authority"; + + private final SocketAddress address; + private volatile boolean shutdown; + + private FakeNameResolver(SocketAddress address) { + this.address = address; + } + + @Override + public void start(Listener2 listener) { + if (shutdown) { + listener.onError(Status.FAILED_PRECONDITION.withDescription("Resolver is shutdown")); + } else { + listener.onResult( + ResolutionResult.newBuilder() + .setAddresses(ImmutableList.of(new EquivalentAddressGroup(address))) + .build()); + } + } + + @Override + public String getServiceAuthority() { + return AUTHORITY; + } + + @Override + public void shutdown() { + shutdown = true; + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java index a6c3c2feb99a..e29949eb8e58 100644 --- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java @@ -28,6 +28,7 @@ import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.grpc.Channel; import io.grpc.Context; import io.grpc.InternalLogId; import io.grpc.ManagedChannel; @@ -36,6 +37,11 @@ import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; import io.grpc.stub.StreamObserver; +import io.grpc.xds.Bootstrapper.ServerInfo; +import io.grpc.xds.ClientXdsClient.XdsChannelFactory; +import io.grpc.xds.EnvoyProtoData.Node; +import io.grpc.xds.XdsClient.ResourceStore; +import io.grpc.xds.XdsClient.XdsResponseHandler; import io.grpc.xds.XdsLogger.XdsLogLevel; import java.util.Collection; import java.util.Collections; @@ -48,7 +54,7 @@ * Common base type for XdsClient implementations, which encapsulates the layer abstraction of * the xDS RPC stream. */ -abstract class AbstractXdsClient extends XdsClient { +final class AbstractXdsClient { private static final String ADS_TYPE_URL_LDS_V2 = "type.googleapis.com/envoy.api.v2.Listener"; private static final String ADS_TYPE_URL_LDS = @@ -66,27 +72,18 @@ abstract class AbstractXdsClient extends XdsClient { private static final String ADS_TYPE_URL_EDS = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; - private final SynchronizationContext syncContext = new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - getLogger().log( - XdsLogLevel.ERROR, - "Uncaught exception in XdsClient SynchronizationContext. Panic!", - e); - // TODO(chengyuanzhang): better error handling. - throw new AssertionError(e); - } - }); - private final MessagePrinter msgPrinter = new MessagePrinter(); + private final SynchronizationContext syncContext; private final InternalLogId logId; private final XdsLogger logger; + private final ServerInfo serverInfo; private final ManagedChannel channel; + private final XdsResponseHandler xdsResponseHandler; + private final ResourceStore resourceStore; private final Context context; private final ScheduledExecutorService timeService; private final BackoffPolicy.Provider backoffPolicyProvider; private final Stopwatch stopwatch; - private final Bootstrapper.BootstrapInfo bootstrapInfo; + private final Node bootstrapNode; // Last successfully applied version_info for each resource type. Starts with empty string. // A version_info is used to update management server with client's most recent knowledge of @@ -104,71 +101,42 @@ public void uncaughtException(Thread t, Throwable e) { @Nullable private ScheduledHandle rpcRetryTimer; - AbstractXdsClient(ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo, - Context context, ScheduledExecutorService timeService, - BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier) { - this.channel = checkNotNull(channel, "channel"); - this.bootstrapInfo = checkNotNull(bootstrapInfo, "bootstrapInfo"); + /** An entity that manages ADS RPCs over a single channel. */ + // TODO: rename to XdsChannel + AbstractXdsClient( + XdsChannelFactory xdsChannelFactory, + ServerInfo serverInfo, + Node bootstrapNode, + XdsResponseHandler xdsResponseHandler, + ResourceStore resourceStore, + Context context, + ScheduledExecutorService + timeService, + SynchronizationContext syncContext, + BackoffPolicy.Provider backoffPolicyProvider, + Supplier stopwatchSupplier) { + this.serverInfo = checkNotNull(serverInfo, "serverInfo"); + this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo); + this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler"); + this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber"); + this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode"); this.context = checkNotNull(context, "context"); this.timeService = checkNotNull(timeService, "timeService"); + this.syncContext = checkNotNull(syncContext, "syncContext"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get(); - logId = InternalLogId.allocate("xds-client", null); + logId = InternalLogId.allocate("xds-client", serverInfo.target()); logger = XdsLogger.withLogId(logId); logger.log(XdsLogLevel.INFO, "Created"); } - /** - * Called when an LDS response is received. - */ - // Must be synchronized. - protected void handleLdsResponse(String versionInfo, List resources, String nonce) { - } - - /** - * Called when a RDS response is received. - */ - // Must be synchronized. - protected void handleRdsResponse(String versionInfo, List resources, String nonce) { - } - - /** - * Called when a CDS response is received. - */ - // Must be synchronized. - protected void handleCdsResponse(String versionInfo, List resources, String nonce) { - } - - /** - * Called when an EDS response is received. - */ - // Must be synchronized. - protected void handleEdsResponse(String versionInfo, List resources, String nonce) { - } - - /** - * Called when the ADS stream is closed passively. - */ - // Must be synchronized. - protected void handleStreamClosed(Status error) { - } - - /** - * Called when the ADS stream has been recreated. - */ - // Must be synchronized. - protected void handleStreamRestarted() { - } - - /** - * Called when being shut down. - */ - // Must be synchronized. - protected void handleShutdown() { + /** The underlying channel. */ + // Currently, only externally used for LrsClient. + Channel channel() { + return channel; } - @Override - final void shutdown() { + void shutdown() { syncContext.execute(new Runnable() { @Override public void run() { @@ -180,49 +148,28 @@ public void run() { if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { rpcRetryTimer.cancel(); } - handleShutdown(); + channel.shutdown(); } }); } - @Override - boolean isShutDown() { - return shutdown; - } - - @Override - Bootstrapper.BootstrapInfo getBootstrapInfo() { - return bootstrapInfo; - } - @Override public String toString() { return logId.toString(); } - /** - * Returns the collection of resources currently subscribing to or {@code null} if not - * subscribing to any resources for the given type. - * - *

Note an empty collection indicates subscribing to resources of the given type with - * wildcard mode. - */ - // Must be synchronized. - @Nullable - abstract Collection getSubscribedResources(ResourceType type); - /** * Updates the resource subscription for the given resource type. */ // Must be synchronized. - protected final void adjustResourceSubscription(ResourceType type) { + void adjustResourceSubscription(ResourceType type) { if (isInBackoff()) { return; } if (adsStream == null) { startRpcStream(); } - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources != null) { adsStream.sendDiscoveryRequest(type, resources); } @@ -233,7 +180,7 @@ protected final void adjustResourceSubscription(ResourceType type) { * and sends an ACK request to the management server. */ // Must be synchronized. - protected final void ackResponse(ResourceType type, String versionInfo, String nonce) { + void ackResponse(ResourceType type, String versionInfo, String nonce) { switch (type) { case LDS: ldsVersion = versionInfo; @@ -253,7 +200,7 @@ protected final void ackResponse(ResourceType type, String versionInfo, String n } logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}", type, nonce, versionInfo); - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources == null) { resources = Collections.emptyList(); } @@ -265,34 +212,22 @@ protected final void ackResponse(ResourceType type, String versionInfo, String n * accepted version) to the management server. */ // Must be synchronized. - protected final void nackResponse(ResourceType type, String nonce, String errorDetail) { + void nackResponse(ResourceType type, String nonce, String errorDetail) { String versionInfo = getCurrentVersion(type); logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}", type, nonce, versionInfo); - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources == null) { resources = Collections.emptyList(); } adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail); } - protected final SynchronizationContext getSyncContext() { - return syncContext; - } - - protected final ScheduledExecutorService getTimeService() { - return timeService; - } - - protected final XdsLogger getLogger() { - return logger; - } - /** * Returns {@code true} if the resource discovery is currently in backoff. */ // Must be synchronized. - protected final boolean isInBackoff() { + boolean isInBackoff() { return rpcRetryTimer != null && rpcRetryTimer.isPending(); } @@ -303,7 +238,7 @@ protected final boolean isInBackoff() { // Must be synchronized. private void startRpcStream() { checkState(adsStream == null, "Previous adsStream has not been cleared yet"); - if (bootstrapInfo.servers().get(0).useProtocolV3()) { + if (serverInfo.useProtocolV3()) { adsStream = new AdsStreamV3(); } else { adsStream = new AdsStreamV2(); @@ -318,8 +253,8 @@ private void startRpcStream() { stopwatch.reset().start(); } + /** Returns the latest accepted version of the given resource type. */ // Must be synchronized. - @Override String getCurrentVersion(ResourceType type) { String version; switch (type) { @@ -354,16 +289,16 @@ public void run() { if (type == ResourceType.UNKNOWN) { continue; } - Collection resources = getSubscribedResources(type); + Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources != null) { adsStream.sendDiscoveryRequest(type, resources); } } - handleStreamRestarted(); + xdsResponseHandler.handleStreamRestarted(serverInfo); } } - protected enum ResourceType { + enum ResourceType { UNKNOWN, LDS, RDS, CDS, EDS; String typeUrl() { @@ -489,19 +424,19 @@ final void handleRpcResponse( switch (type) { case LDS: ldsRespNonce = nonce; - handleLdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleLdsResponse(serverInfo, versionInfo, resources, nonce); break; case RDS: rdsRespNonce = nonce; - handleRdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleRdsResponse(serverInfo, versionInfo, resources, nonce); break; case CDS: cdsRespNonce = nonce; - handleCdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleCdsResponse(serverInfo, versionInfo, resources, nonce); break; case EDS: edsRespNonce = nonce; - handleEdsResponse(versionInfo, resources, nonce); + xdsResponseHandler.handleEdsResponse(serverInfo, versionInfo, resources, nonce); break; case UNKNOWN: default: @@ -527,7 +462,7 @@ private void handleRpcStreamClosed(Status error) { "ADS stream closed with status {0}: {1}. Cause: {2}", error.getCode(), error.getDescription(), error.getCause()); closed = true; - handleStreamClosed(error); + xdsResponseHandler.handleStreamClosed(error); cleanUp(); if (responseReceived || retryBackoffPolicy == null) { // Reset the backoff sequence if had received a response, or backoff sequence @@ -580,8 +515,9 @@ public void onNext(final io.envoyproxy.envoy.api.v2.DiscoveryResponse response) public void run() { ResourceType type = ResourceType.fromTypeUrl(response.getTypeUrl()); if (logger.isLoggable(XdsLogLevel.DEBUG)) { - logger.log(XdsLogLevel.DEBUG, "Received {0} response:\n{1}", - type, msgPrinter.print(response)); + logger.log( + XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type, + MessagePrinter.print(response)); } handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(), response.getNonce()); @@ -619,7 +555,7 @@ void sendDiscoveryRequest(ResourceType type, String versionInfo, Collection serverChannelMap = new HashMap<>(); private final Map ldsResourceSubscribers = new HashMap<>(); private final Map rdsResourceSubscribers = new HashMap<>(); private final Map cdsResourceSubscribers = new HashMap<>(); private final Map edsResourceSubscribers = new HashMap<>(); private final LoadStatsManager2 loadStatsManager; - private final LoadReportClient lrsClient; + private final Map serverLrsClientMap = new HashMap<>(); + private final XdsChannelFactory xdsChannelFactory; + private final Bootstrapper.BootstrapInfo bootstrapInfo; + private final Context context; + private final ScheduledExecutorService timeService; + private final BackoffPolicy.Provider backoffPolicyProvider; + private final Supplier stopwatchSupplier; private final TimeProvider timeProvider; private boolean reportingLoad; private final TlsContextManager tlsContextManager; + private final InternalLogId logId; + private final XdsLogger logger; + private volatile boolean isShutdown; + // TODO(zdapeng): rename to XdsClientImpl ClientXdsClient( - ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo, Context context, - ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider, - Supplier stopwatchSupplier, TimeProvider timeProvider, + XdsChannelFactory xdsChannelFactory, + Bootstrapper.BootstrapInfo bootstrapInfo, + Context context, + ScheduledExecutorService timeService, + BackoffPolicy.Provider backoffPolicyProvider, + Supplier stopwatchSupplier, + TimeProvider timeProvider, TlsContextManager tlsContextManager) { - super(channel, bootstrapInfo, context, timeService, backoffPolicyProvider, stopwatchSupplier); + this.xdsChannelFactory = xdsChannelFactory; + this.bootstrapInfo = bootstrapInfo; + this.context = context; + this.timeService = timeService; loadStatsManager = new LoadStatsManager2(stopwatchSupplier); + this.backoffPolicyProvider = backoffPolicyProvider; + this.stopwatchSupplier = stopwatchSupplier; this.timeProvider = timeProvider; this.tlsContextManager = checkNotNull(tlsContextManager, "tlsContextManager"); - lrsClient = new LoadReportClient(loadStatsManager, channel, context, - bootstrapInfo.servers().get(0).useProtocolV3(), bootstrapInfo.node(), - getSyncContext(), timeService, backoffPolicyProvider, stopwatchSupplier); + logId = InternalLogId.allocate("xds-client", null); + logger = XdsLogger.withLogId(logId); + logger.log(XdsLogLevel.INFO, "Created"); + } + + private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { + syncContext.throwIfNotInThisSynchronizationContext(); + if (serverChannelMap.containsKey(serverInfo)) { + return; + } + AbstractXdsClient xdsChannel = new AbstractXdsClient( + xdsChannelFactory, + serverInfo, + bootstrapInfo.node(), + this, + this, + context, + timeService, + syncContext, + backoffPolicyProvider, + stopwatchSupplier); + LoadReportClient lrsClient = new LoadReportClient( + loadStatsManager, xdsChannel.channel(), context, serverInfo.useProtocolV3(), + bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier); + serverChannelMap.put(serverInfo, xdsChannel); + serverLrsClientMap.put(serverInfo, lrsClient); } @Override - protected void handleLdsResponse(String versionInfo, List resources, String nonce) { + public void handleLdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -233,12 +299,12 @@ protected void handleLdsResponse(String versionInfo, List resources, String // LdsUpdate parsed successfully. parsedResources.put(listenerName, new ParsedResource(ldsUpdate, resource)); } - getLogger().log(XdsLogLevel.INFO, + logger.log(XdsLogLevel.INFO, "Received LDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo, - nonce, errors); + serverInfo, ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, + versionInfo, nonce, errors); } private LdsUpdate processClientSideListener( @@ -1307,7 +1373,9 @@ static StructOrError parseClusterWeight( } @Override - protected void handleRdsResponse(String versionInfo, List resources, String nonce) { + public void handleRdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -1344,12 +1412,12 @@ protected void handleRdsResponse(String versionInfo, List resources, String parsedResources.put(routeConfigName, new ParsedResource(rdsUpdate, resource)); } - getLogger().log(XdsLogLevel.INFO, + logger.log(XdsLogLevel.INFO, "Received RDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.RDS, parsedResources, invalidResources, Collections.emptySet(), - versionInfo, nonce, errors); + serverInfo, ResourceType.RDS, parsedResources, invalidResources, + Collections.emptySet(), versionInfo, nonce, errors); } private static RdsUpdate processRouteConfiguration( @@ -1370,7 +1438,9 @@ private static RdsUpdate processRouteConfiguration( } @Override - protected void handleCdsResponse(String versionInfo, List resources, String nonce) { + public void handleCdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -1415,12 +1485,12 @@ protected void handleCdsResponse(String versionInfo, List resources, String } parsedResources.put(clusterName, new ParsedResource(cdsUpdate, resource)); } - getLogger().log(XdsLogLevel.INFO, + logger.log(XdsLogLevel.INFO, "Received CDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, versionInfo, - nonce, errors); + serverInfo, ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, + versionInfo, nonce, errors); } @VisibleForTesting @@ -1598,7 +1668,9 @@ private static StructOrError parseNonAggregateCluster( } @Override - protected void handleEdsResponse(String versionInfo, List resources, String nonce) { + public void handleEdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce) { + syncContext.throwIfNotInThisSynchronizationContext(); Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); Set invalidResources = new HashSet<>(); @@ -1641,12 +1713,12 @@ protected void handleEdsResponse(String versionInfo, List resources, String } parsedResources.put(clusterName, new ParsedResource(edsUpdate, resource)); } - getLogger().log( + logger.log( XdsLogLevel.INFO, "Received EDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); handleResourceUpdate( - ResourceType.EDS, parsedResources, invalidResources, Collections.emptySet(), - versionInfo, nonce, errors); + serverInfo, ResourceType.EDS, parsedResources, invalidResources, + Collections.emptySet(), versionInfo, nonce, errors); } private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment) @@ -1775,7 +1847,8 @@ private static int getRatePerMillion(FractionalPercent percent) { } @Override - protected void handleStreamClosed(Status error) { + public void handleStreamClosed(Status error) { + syncContext.throwIfNotInThisSynchronizationContext(); cleanUpResourceTimers(); for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { subscriber.onError(error); @@ -1792,27 +1865,56 @@ protected void handleStreamClosed(Status error) { } @Override - protected void handleStreamRestarted() { + public void handleStreamRestarted(ServerInfo serverInfo) { + syncContext.throwIfNotInThisSynchronizationContext(); for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } for (ResourceSubscriber subscriber : rdsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) { - subscriber.restartTimer(); + if (subscriber.serverInfo.equals(serverInfo)) { + subscriber.restartTimer(); + } } } @Override - protected void handleShutdown() { - if (reportingLoad) { - lrsClient.stopLoadReporting(); - } - cleanUpResourceTimers(); + void shutdown() { + syncContext.execute( + new Runnable() { + @Override + public void run() { + if (isShutdown) { + return; + } + isShutdown = true; + for (AbstractXdsClient xdsChannel : serverChannelMap.values()) { + xdsChannel.shutdown(); + } + if (reportingLoad) { + for (final LoadReportClient lrsClient : serverLrsClientMap.values()) { + lrsClient.stopLoadReporting(); + } + } + cleanUpResourceTimers(); + } + }); + } + + @Override + boolean isShutDown() { + return isShutdown; } private Map getSubscribedResourcesMap(ResourceType type) { @@ -1833,9 +1935,16 @@ private Map getSubscribedResourcesMap(ResourceType t @Nullable @Override - Collection getSubscribedResources(ResourceType type) { + public Collection getSubscribedResources(ServerInfo serverInfo, ResourceType type) { Map resources = getSubscribedResourcesMap(type); - return resources.isEmpty() ? null : resources.keySet(); + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (String key : resources.keySet()) { + if (resources.get(key).serverInfo.equals(serverInfo)) { + builder.add(key); + } + } + Collection retVal = builder.build(); + return retVal.isEmpty() ? null : retVal; } @Override @@ -1854,15 +1963,15 @@ TlsContextManager getTlsContextManager() { @Override void watchLdsResource(final String resourceName, final LdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe LDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe LDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.LDS, resourceName); ldsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.LDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS); } subscriber.addWatcher(watcher); } @@ -1871,16 +1980,16 @@ public void run() { @Override void cancelLdsResourceWatch(final String resourceName, final LdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName); ldsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.LDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS); } } }); @@ -1888,15 +1997,15 @@ public void run() { @Override void watchRdsResource(final String resourceName, final RdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe RDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe RDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.RDS, resourceName); rdsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.RDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS); } subscriber.addWatcher(watcher); } @@ -1905,16 +2014,16 @@ public void run() { @Override void cancelRdsResourceWatch(final String resourceName, final RdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName); rdsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.RDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS); } } }); @@ -1922,15 +2031,15 @@ public void run() { @Override void watchCdsResource(final String resourceName, final CdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.CDS, resourceName); cdsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.CDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS); } subscriber.addWatcher(watcher); } @@ -1939,16 +2048,16 @@ public void run() { @Override void cancelCdsResourceWatch(final String resourceName, final CdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName); cdsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.CDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS); } } }); @@ -1956,15 +2065,15 @@ public void run() { @Override void watchEdsResource(final String resourceName, final EdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName); if (subscriber == null) { - getLogger().log(XdsLogLevel.INFO, "Subscribe EDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Subscribe EDS resource {0}", resourceName); subscriber = new ResourceSubscriber(ResourceType.EDS, resourceName); edsResourceSubscribers.put(resourceName, subscriber); - adjustResourceSubscription(ResourceType.EDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS); } subscriber.addWatcher(watcher); } @@ -1973,30 +2082,32 @@ public void run() { @Override void cancelEdsResourceWatch(final String resourceName, final EdsResourceWatcher watcher) { - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.stopTimer(); - getLogger().log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName); + logger.log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName); edsResourceSubscribers.remove(resourceName); - adjustResourceSubscription(ResourceType.EDS); + subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS); } } }); } @Override - ClusterDropStats addClusterDropStats(String clusterName, @Nullable String edsServiceName) { + ClusterDropStats addClusterDropStats( + String clusterName, @Nullable String edsServiceName) { ClusterDropStats dropCounter = loadStatsManager.getClusterDropStats(clusterName, edsServiceName); - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { if (!reportingLoad) { - lrsClient.startLoadReporting(); + // TODO(https://github.com/grpc/grpc-java/issues/8628): consume ServerInfo arg. + serverLrsClientMap.values().iterator().next().startLoadReporting(); reportingLoad = true; } } @@ -2005,15 +2116,17 @@ public void run() { } @Override - ClusterLocalityStats addClusterLocalityStats(String clusterName, - @Nullable String edsServiceName, Locality locality) { + ClusterLocalityStats addClusterLocalityStats( + String clusterName, @Nullable String edsServiceName, + Locality locality) { ClusterLocalityStats loadCounter = loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality); - getSyncContext().execute(new Runnable() { + syncContext.execute(new Runnable() { @Override public void run() { if (!reportingLoad) { - lrsClient.startLoadReporting(); + // TODO(https://github.com/grpc/grpc-java/issues/8628): consume ServerInfo arg. + serverLrsClientMap.values().iterator().next().startLoadReporting(); reportingLoad = true; } } @@ -2021,6 +2134,25 @@ public void run() { return loadCounter; } + @Override + Bootstrapper.BootstrapInfo getBootstrapInfo() { + return bootstrapInfo; + } + + // TODO(https://github.com/grpc/grpc-java/issues/8629): remove this + @Override + String getCurrentVersion(ResourceType type) { + if (serverChannelMap.isEmpty()) { + return ""; + } + return serverChannelMap.values().iterator().next().getCurrentVersion(type); + } + + @Override + public String toString() { + return logId.toString(); + } + private void cleanUpResourceTimers() { for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { subscriber.stopTimer(); @@ -2037,18 +2169,19 @@ private void cleanUpResourceTimers() { } private void handleResourceUpdate( - ResourceType type, Map parsedResources, Set invalidResources, - Set retainedResources, String version, String nonce, List errors) { + ServerInfo serverInfo, ResourceType type, Map parsedResources, + Set invalidResources, Set retainedResources, String version, String nonce, + List errors) { String errorDetail = null; if (errors.isEmpty()) { checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors"); - ackResponse(type, version, nonce); + serverChannelMap.get(serverInfo).ackResponse(type, version, nonce); } else { errorDetail = Joiner.on('\n').join(errors); - getLogger().log(XdsLogLevel.WARNING, + logger.log(XdsLogLevel.WARNING, "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}", type, version, nonce, errorDetail); - nackResponse(type, nonce, errorDetail); + serverChannelMap.get(serverInfo).nackResponse(type, nonce, errorDetail); } long updateTime = timeProvider.currentTimeNanos(); for (Map.Entry entry : getSubscribedResourcesMap(type).entrySet()) { @@ -2123,6 +2256,8 @@ private Any getRawResource() { * Tracks a single subscribed resource. */ private final class ResourceSubscriber { + private final ServerInfo serverInfo; + private final AbstractXdsClient xdsChannel; private final ResourceType type; private final String resource; private final Set watchers = new HashSet<>(); @@ -2132,17 +2267,26 @@ private final class ResourceSubscriber { private ResourceMetadata metadata; ResourceSubscriber(ResourceType type, String resource) { + syncContext.throwIfNotInThisSynchronizationContext(); this.type = type; this.resource = resource; + this.serverInfo = getServerInfo(); // Initialize metadata in UNKNOWN state to cover the case when resource subscriber, // is created but not yet requested because the client is in backoff. this.metadata = ResourceMetadata.newResourceMetadataUnknown(); - if (isInBackoff()) { + maybeCreateXdsChannelWithLrs(serverInfo); + this.xdsChannel = serverChannelMap.get(serverInfo); + if (xdsChannel.isInBackoff()) { return; } restartTimer(); } + // TODO(zdapeng): add resourceName arg and support xdstp:// resources + private ServerInfo getServerInfo() { + return bootstrapInfo.servers().get(0); // use first server + } + void addWatcher(ResourceWatcher watcher) { checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher); watchers.add(watcher); @@ -2165,7 +2309,7 @@ void restartTimer() { class ResourceNotFound implements Runnable { @Override public void run() { - getLogger().log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout", + logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout", type, resource); respTimer = null; onAbsent(); @@ -2179,9 +2323,9 @@ public String toString() { // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED. metadata = ResourceMetadata.newResourceMetadataRequested(); - respTimer = getSyncContext().schedule( + respTimer = syncContext.schedule( new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, - getTimeService()); + timeService); } void stopTimer() { @@ -2216,7 +2360,7 @@ void onAbsent() { if (respTimer != null && respTimer.isPending()) { // too early to conclude absence return; } - getLogger().log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); + logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); if (!absent) { data = null; absent = true; @@ -2324,4 +2468,19 @@ String getErrorDetail() { return errorDetail; } } + + abstract static class XdsChannelFactory { + static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() { + @Override + ManagedChannel create(ServerInfo serverInfo) { + String target = serverInfo.target(); + ChannelCredentials channelCredentials = serverInfo.channelCredentials(); + return Grpc.newChannelBuilder(target, channelCredentials) + .keepAliveTime(5, TimeUnit.MINUTES) + .build(); + } + }; + + abstract ManagedChannel create(ServerInfo serverInfo); + } } diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index 54fa20128bc0..af2a673e9f7b 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -28,9 +28,9 @@ import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceStub; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; +import io.grpc.Channel; import io.grpc.Context; import io.grpc.InternalLogId; -import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; @@ -55,7 +55,7 @@ final class LoadReportClient { private final InternalLogId logId; private final XdsLogger logger; - private final ManagedChannel channel; + private final Channel channel; private final Context context; private final boolean useProtocolV3; private final Node node; @@ -75,7 +75,7 @@ final class LoadReportClient { LoadReportClient( LoadStatsManager2 loadStatsManager, - ManagedChannel channel, + Channel channel, Context context, boolean useProtocolV3, Node node, diff --git a/xds/src/main/java/io/grpc/xds/MessagePrinter.java b/xds/src/main/java/io/grpc/xds/MessagePrinter.java index edddcb7a4658..249e1f0e9659 100644 --- a/xds/src/main/java/io/grpc/xds/MessagePrinter.java +++ b/xds/src/main/java/io/grpc/xds/MessagePrinter.java @@ -38,43 +38,49 @@ * containing {@link com.google.protobuf.Any} fields. */ final class MessagePrinter { - private final JsonFormat.Printer printer; - MessagePrinter() { - TypeRegistry registry = - TypeRegistry.newBuilder() - .add(Listener.getDescriptor()) - .add(io.envoyproxy.envoy.api.v2.Listener.getDescriptor()) - .add(HttpConnectionManager.getDescriptor()) - .add(io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2 - .HttpConnectionManager.getDescriptor()) - .add(HTTPFault.getDescriptor()) - .add(io.envoyproxy.envoy.config.filter.http.fault.v2.HTTPFault.getDescriptor()) - .add(RBAC.getDescriptor()) - .add(RBACPerRoute.getDescriptor()) - .add(Router.getDescriptor()) - .add(io.envoyproxy.envoy.config.filter.http.router.v2.Router.getDescriptor()) - // UpstreamTlsContext and DownstreamTlsContext in v3 are not transitively imported - // by top-level resource types. - .add(UpstreamTlsContext.getDescriptor()) - .add(DownstreamTlsContext.getDescriptor()) - .add(RouteConfiguration.getDescriptor()) - .add(io.envoyproxy.envoy.api.v2.RouteConfiguration.getDescriptor()) - .add(Cluster.getDescriptor()) - .add(io.envoyproxy.envoy.api.v2.Cluster.getDescriptor()) - .add(ClusterConfig.getDescriptor()) - .add(io.envoyproxy.envoy.config.cluster.aggregate.v2alpha.ClusterConfig - .getDescriptor()) - .add(ClusterLoadAssignment.getDescriptor()) - .add(io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.getDescriptor()) - .build(); - printer = JsonFormat.printer().usingTypeRegistry(registry); + private MessagePrinter() {} + + // The initialization-on-demand holder idiom. + private static class LazyHolder { + static final JsonFormat.Printer printer = newPrinter(); + + private static JsonFormat.Printer newPrinter() { + TypeRegistry registry = + TypeRegistry.newBuilder() + .add(Listener.getDescriptor()) + .add(io.envoyproxy.envoy.api.v2.Listener.getDescriptor()) + .add(HttpConnectionManager.getDescriptor()) + .add(io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2 + .HttpConnectionManager.getDescriptor()) + .add(HTTPFault.getDescriptor()) + .add(io.envoyproxy.envoy.config.filter.http.fault.v2.HTTPFault.getDescriptor()) + .add(RBAC.getDescriptor()) + .add(RBACPerRoute.getDescriptor()) + .add(Router.getDescriptor()) + .add(io.envoyproxy.envoy.config.filter.http.router.v2.Router.getDescriptor()) + // UpstreamTlsContext and DownstreamTlsContext in v3 are not transitively imported + // by top-level resource types. + .add(UpstreamTlsContext.getDescriptor()) + .add(DownstreamTlsContext.getDescriptor()) + .add(RouteConfiguration.getDescriptor()) + .add(io.envoyproxy.envoy.api.v2.RouteConfiguration.getDescriptor()) + .add(Cluster.getDescriptor()) + .add(io.envoyproxy.envoy.api.v2.Cluster.getDescriptor()) + .add(ClusterConfig.getDescriptor()) + .add(io.envoyproxy.envoy.config.cluster.aggregate.v2alpha.ClusterConfig + .getDescriptor()) + .add(ClusterLoadAssignment.getDescriptor()) + .add(io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.getDescriptor()) + .build(); + return JsonFormat.printer().usingTypeRegistry(registry); + } } - String print(MessageOrBuilder message) { + static String print(MessageOrBuilder message) { String res; try { - res = printer.print(message); + res = LazyHolder.printer.print(message); } catch (InvalidProtocolBufferException e) { res = message + " (failed to pretty-print: " + e + ")"; } diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java index 14bdced5dac5..1c8fe0bad6d2 100644 --- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java @@ -19,22 +19,18 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; -import io.grpc.ChannelCredentials; import io.grpc.Context; -import io.grpc.Grpc; -import io.grpc.ManagedChannel; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.TimeProvider; import io.grpc.xds.Bootstrapper.BootstrapInfo; -import io.grpc.xds.Bootstrapper.ServerInfo; +import io.grpc.xds.ClientXdsClient.XdsChannelFactory; import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; import io.grpc.xds.internal.sds.TlsContextManagerImpl; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -113,8 +109,6 @@ static class RefCountedXdsClientObjectPool implements ObjectPool { @GuardedBy("lock") private ScheduledExecutorService scheduler; @GuardedBy("lock") - private ManagedChannel channel; - @GuardedBy("lock") private XdsClient xdsClient; @GuardedBy("lock") private int refCount; @@ -128,16 +122,16 @@ static class RefCountedXdsClientObjectPool implements ObjectPool { public XdsClient getObject() { synchronized (lock) { if (refCount == 0) { - ServerInfo serverInfo = bootstrapInfo.servers().get(0); // use first server - String target = serverInfo.target(); - ChannelCredentials channelCredentials = serverInfo.channelCredentials(); - channel = Grpc.newChannelBuilder(target, channelCredentials) - .keepAliveTime(5, TimeUnit.MINUTES) - .build(); scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); - xdsClient = new ClientXdsClient(channel, bootstrapInfo, context, scheduler, - new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER, - TimeProvider.SYSTEM_TIME_PROVIDER, new TlsContextManagerImpl(bootstrapInfo)); + xdsClient = new ClientXdsClient( + XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY, + bootstrapInfo, + context, + scheduler, + new ExponentialBackoffPolicy.Provider(), + GrpcUtil.STOPWATCH_SUPPLIER, + TimeProvider.SYSTEM_TIME_PROVIDER, + new TlsContextManagerImpl(bootstrapInfo)); } refCount++; return xdsClient; @@ -151,21 +145,12 @@ public XdsClient returnObject(Object object) { if (refCount == 0) { xdsClient.shutdown(); xdsClient = null; - channel.shutdown(); scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler); } return null; } } - @VisibleForTesting - @Nullable - ManagedChannel getChannelForTest() { - synchronized (lock) { - return channel; - } - } - @VisibleForTesting @Nullable XdsClient getXdsClientForTest() { diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 6b6be57f0429..1daa257e54ea 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -24,6 +24,7 @@ import com.google.protobuf.Any; import io.grpc.Status; import io.grpc.xds.AbstractXdsClient.ResourceType; +import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.EnvoyServerProtoData.Listener; @@ -31,6 +32,7 @@ import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -495,6 +497,7 @@ TlsContextManager getTlsContextManager() { /** * Returns the latest accepted version of the given resource type. */ + // TODO(https://github.com/grpc/grpc-java/issues/8629): remove this String getCurrentVersion(ResourceType type) { throw new UnsupportedOperationException(); } @@ -566,6 +569,7 @@ void cancelEdsResourceWatch(String resourceName, EdsResourceWatcher watcher) { * use {@link ClusterDropStats#release} to release its hard reference when it is safe to * stop reporting dropped RPCs for the specified cluster in the future. */ + // TODO(https://github.com/grpc/grpc-java/issues/8628): add ServerInfo arg ClusterDropStats addClusterDropStats(String clusterName, @Nullable String edsServiceName) { throw new UnsupportedOperationException(); } @@ -578,8 +582,48 @@ ClusterDropStats addClusterDropStats(String clusterName, @Nullable String edsSer * reference when it is safe to stop reporting RPC loads for the specified locality in the * future. */ + // TODO(https://github.com/grpc/grpc-java/issues/8628): add ServerInfo arg ClusterLocalityStats addClusterLocalityStats( String clusterName, @Nullable String edsServiceName, Locality locality) { throw new UnsupportedOperationException(); } + + interface XdsResponseHandler { + /** Called when an LDS response is received. */ + void handleLdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when an RDS response is received. */ + void handleRdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when an CDS response is received. */ + void handleCdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when an EDS response is received. */ + void handleEdsResponse( + ServerInfo serverInfo, String versionInfo, List resources, String nonce); + + /** Called when the ADS stream is closed passively. */ + // Must be synchronized. + void handleStreamClosed(Status error); + + /** Called when the ADS stream has been recreated. */ + // Must be synchronized. + void handleStreamRestarted(ServerInfo serverInfo); + } + + interface ResourceStore { + /** + * Returns the collection of resources currently subscribing to or {@code null} if not + * subscribing to any resources for the given type. + * + *

Note an empty collection indicates subscribing to resources of the given type with + * wildcard mode. + */ + // Must be synchronized. + @Nullable + Collection getSubscribedResources(ServerInfo serverInfo, ResourceType type); + } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 5fa9e3da734a..9809738c68d6 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -59,6 +59,8 @@ import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.AbstractXdsClient.ResourceType; import io.grpc.xds.Bootstrapper.CertificateProviderInfo; +import io.grpc.xds.Bootstrapper.ServerInfo; +import io.grpc.xds.ClientXdsClient.XdsChannelFactory; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; @@ -272,6 +274,12 @@ public void setUp() throws IOException { .start()); channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); + XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() { + @Override + ManagedChannel create(ServerInfo serverInfo) { + return channel; + } + }; Bootstrapper.BootstrapInfo bootstrapInfo = Bootstrapper.BootstrapInfo.builder() @@ -284,7 +292,7 @@ public void setUp() throws IOException { .build(); xdsClient = new ClientXdsClient( - channel, + xdsChannelFactory, bootstrapInfo, Context.ROOT, fakeClock.getScheduledExecutorService(), @@ -2325,6 +2333,7 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe @Test public void reportLoadStatsToServer() { + xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); String clusterName = "cluster-foo.googleapis.com"; ClusterDropStats dropStats = xdsClient.addClusterDropStats(clusterName, null); LrsRpcCall lrsCall = loadReportCalls.poll(); diff --git a/xds/src/test/java/io/grpc/xds/MessagePrinterTest.java b/xds/src/test/java/io/grpc/xds/MessagePrinterTest.java index bb6c5517ab99..7b6942583756 100644 --- a/xds/src/test/java/io/grpc/xds/MessagePrinterTest.java +++ b/xds/src/test/java/io/grpc/xds/MessagePrinterTest.java @@ -64,7 +64,6 @@ */ @RunWith(JUnit4.class) public class MessagePrinterTest { - private final MessagePrinter printer = new MessagePrinter(); @Test public void printLdsResponse_v3() { @@ -151,7 +150,7 @@ public void printLdsResponse_v3() { + " \"typeUrl\": \"type.googleapis.com/envoy.config.listener.v3.Listener\",\n" + " \"nonce\": \"0000\"\n" + "}"; - String res = printer.print(response); + String res = MessagePrinter.print(response); assertThat(res).isEqualTo(expectedString); } @@ -202,7 +201,7 @@ public void printRdsResponse_v3() { + " \"typeUrl\": \"type.googleapis.com/envoy.config.route.v3.RouteConfiguration\",\n" + " \"nonce\": \"0000\"\n" + "}"; - String res = printer.print(response); + String res = MessagePrinter.print(response); assertThat(res).isEqualTo(expectedString); } @@ -267,7 +266,7 @@ public void printCdsResponse_v3() { + " \"typeUrl\": \"type.googleapis.com/envoy.config.cluster.v3.Cluster\",\n" + " \"nonce\": \"0000\"\n" + "}"; - String res = printer.print(response); + String res = MessagePrinter.print(response); assertThat(res).isEqualTo(expectedString); } @@ -356,7 +355,7 @@ public void printEdsResponse_v3() { + ".ClusterLoadAssignment\",\n" + " \"nonce\": \"0000\"\n" + "}"; - String res = printer.print(response); + String res = MessagePrinter.print(response); assertThat(res).isEqualTo(expectedString); } } diff --git a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java index 6a3cba4ac351..14a8f1ce7438 100644 --- a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.when; import io.grpc.InsecureChannelCredentials; -import io.grpc.ManagedChannel; import io.grpc.internal.ObjectPool; import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.Bootstrapper.ServerInfo; @@ -90,7 +89,6 @@ public void refCountedXdsClientObjectPool_delayedCreation() { BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); RefCountedXdsClientObjectPool xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo); assertThat(xdsClientPool.getXdsClientForTest()).isNull(); - assertThat(xdsClientPool.getChannelForTest()).isNull(); XdsClient xdsClient = xdsClientPool.getObject(); assertThat(xdsClientPool.getXdsClientForTest()).isNotNull(); xdsClientPool.returnObject(xdsClient); @@ -113,7 +111,6 @@ public void refCountedXdsClientObjectPool_refCounted() { // returnObject twice assertThat(xdsClientPool.returnObject(xdsClient)).isNull(); assertThat(xdsClient.isShutDown()).isTrue(); - assertThat(xdsClientPool.getChannelForTest().isShutdown()).isTrue(); } @Test @@ -123,14 +120,11 @@ public void refCountedXdsClientObjectPool_getObjectCreatesNewInstanceIfAlreadySh BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); RefCountedXdsClientObjectPool xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo); XdsClient xdsClient1 = xdsClientPool.getObject(); - ManagedChannel channel1 = xdsClientPool.getChannelForTest(); assertThat(xdsClientPool.returnObject(xdsClient1)).isNull(); assertThat(xdsClient1.isShutDown()).isTrue(); - assertThat(channel1.isShutdown()).isTrue(); XdsClient xdsClient2 = xdsClientPool.getObject(); assertThat(xdsClient2).isNotSameInstanceAs(xdsClient1); - assertThat(xdsClientPool.getChannelForTest()).isNotSameInstanceAs(channel1); xdsClientPool.returnObject(xdsClient2); } }