Skip to content

Commit

Permalink
xds: use a standalone Context for xDS control plane RPCs (manual v1.3…
Browse files Browse the repository at this point in the history
…7.x backport) (#8161)

Control plane RPCs are independent of application RPCs, they can stand for completely different lifetime. So the context for making application RPCs should not be propagated to control plane RPCs.
  • Loading branch information
voidzcy committed May 10, 2021
1 parent d6ab21c commit 85d1b9f
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 9 deletions.
12 changes: 10 additions & 2 deletions xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.Context;
import io.grpc.InternalLogId;
import io.grpc.ManagedChannel;
import io.grpc.Status;
Expand Down Expand Up @@ -82,6 +83,7 @@ public void uncaughtException(Thread t, Throwable e) {
private final InternalLogId logId;
private final XdsLogger logger;
private final ManagedChannel channel;
private final Context context;
private final boolean useProtocolV3;
private final ScheduledExecutorService timeService;
private final BackoffPolicy.Provider backoffPolicyProvider;
Expand Down Expand Up @@ -109,10 +111,11 @@ public void uncaughtException(Thread t, Throwable e) {
@Nullable
private ScheduledHandle rpcRetryTimer;

AbstractXdsClient(ManagedChannel channel, boolean useProtocolV3, Node node,
AbstractXdsClient(ManagedChannel channel, Context context, boolean useProtocolV3, Node node,
ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.channel = checkNotNull(channel, "channel");
this.context = checkNotNull(context, "context");
this.useProtocolV3 = useProtocolV3;
this.node = checkNotNull(node, "node");
this.timeService = checkNotNull(timeService, "timeService");
Expand Down Expand Up @@ -313,7 +316,12 @@ private void startRpcStream() {
} else {
adsStream = new AdsStreamV2();
}
adsStream.start();
Context prevContext = context.attach();
try {
adsStream.start();
} finally {
context.detach(prevContext);
}
logger.log(XdsLogLevel.INFO, "ADS stream started");
stopwatch.reset().start();
}
Expand Down
8 changes: 5 additions & 3 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
import io.grpc.ManagedChannel;
import io.grpc.Status;
Expand Down Expand Up @@ -134,13 +135,14 @@ final class ClientXdsClient extends AbstractXdsClient {
private boolean reportingLoad;

ClientXdsClient(
ManagedChannel channel, boolean useProtocolV3, Node node,
ManagedChannel channel, Context context, boolean useProtocolV3, Node node,
ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier, TimeProvider timeProvider) {
super(channel, useProtocolV3, node, timeService, backoffPolicyProvider, stopwatchSupplier);
super(channel, context, useProtocolV3, node, timeService, backoffPolicyProvider,
stopwatchSupplier);
loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
this.timeProvider = timeProvider;
lrsClient = new LoadReportClient(loadStatsManager, channel, useProtocolV3, node,
lrsClient = new LoadReportClient(loadStatsManager, channel, context, useProtocolV3, node,
getSyncContext(), timeService, backoffPolicyProvider, stopwatchSupplier);
}

Expand Down
11 changes: 10 additions & 1 deletion xds/src/main/java/io/grpc/xds/LoadReportClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceStub;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.Context;
import io.grpc.InternalLogId;
import io.grpc.ManagedChannel;
import io.grpc.Status;
Expand Down Expand Up @@ -55,6 +56,7 @@ final class LoadReportClient {
private final InternalLogId logId;
private final XdsLogger logger;
private final ManagedChannel channel;
private final Context context;
private final boolean useProtocolV3;
private final Node node;
private final SynchronizationContext syncContext;
Expand All @@ -74,6 +76,7 @@ final class LoadReportClient {
LoadReportClient(
LoadStatsManager2 loadStatsManager,
ManagedChannel channel,
Context context,
boolean useProtocolV3,
Node node,
SynchronizationContext syncContext,
Expand All @@ -82,6 +85,7 @@ final class LoadReportClient {
Supplier<Stopwatch> stopwatchSupplier) {
this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
this.channel = checkNotNull(channel, "xdsChannel");
this.context = checkNotNull(context, "context");
this.useProtocolV3 = useProtocolV3;
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timerService = checkNotNull(scheduledExecutorService, "timeService");
Expand Down Expand Up @@ -163,7 +167,12 @@ private void startLrsRpc() {
lrsStream = new LrsStreamV2();
}
retryStopwatch.reset().start();
lrsStream.start();
Context prevContext = context.attach();
try {
lrsStream.start();
} finally {
context.detach(prevContext);
}
}

private abstract class LrsStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.grpc.ChannelCredentials;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.internal.ExponentialBackoffPolicy;
Expand Down Expand Up @@ -109,6 +110,7 @@ private static class SharedXdsClientPoolProviderHolder {
@ThreadSafe
@VisibleForTesting
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
private final Context context = Context.ROOT;
private final String target;
private final ChannelCredentials channelCredentials;
private final Node node;
Expand Down Expand Up @@ -140,7 +142,7 @@ public XdsClient getObject() {
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
xdsClient = new ClientXdsClient(channel, useProtocolV3, node, scheduler,
xdsClient = new ClientXdsClient(channel, context, useProtocolV3, node, scheduler,
new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.UInt32Value;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.Internal;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -121,6 +122,7 @@ public void createXdsClientAndStart() throws IOException {
XdsClient xdsClientImpl =
new ClientXdsClient(
channel,
Context.ROOT,
serverInfo.isUseProtocolV3(),
node,
timeService,
Expand Down
25 changes: 25 additions & 0 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static io.grpc.xds.AbstractXdsClient.ResourceType.EDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.LDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.RDS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
Expand All @@ -40,6 +42,8 @@
import io.envoyproxy.envoy.config.route.v3.FilterConfig;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig;
import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.Status.Code;
Expand Down Expand Up @@ -258,6 +262,7 @@ public void setUp() throws IOException {
xdsClient =
new ClientXdsClient(
channel,
Context.ROOT,
useProtocolV3(),
EnvoyProtoData.Node.newBuilder().build(),
fakeClock.getScheduledExecutorService(),
Expand Down Expand Up @@ -1766,6 +1771,26 @@ public void multipleEdsWatchers() {
verifySubscribedResourcesMetadataSizes(0, 0, 0, 2);
}

@Test
public void useIndependentRpcContext() {
// Simulates making RPCs within the context of an inbound RPC.
CancellableContext cancellableContext = Context.current().withCancellation();
Context prevContext = cancellableContext.attach();
try {
DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);

// The inbound RPC finishes and closes its context. The outbound RPC's control plane RPC
// should not be impacted.
cancellableContext.close();
verify(ldsResourceWatcher, never()).onError(any(Status.class));

call.sendResponse(LDS, testListenerRds, VERSION_1, "0000");
verify(ldsResourceWatcher).onChanged(any(LdsUpdate.class));
} finally {
cancellableContext.detach(prevContext);
}
}

@Test
public void streamClosedAndRetryWithBackoff() {
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
Expand Down
4 changes: 2 additions & 2 deletions xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public void cancelled(Context context) {
when(backoffPolicy2.nextBackoffNanos())
.thenReturn(TimeUnit.SECONDS.toNanos(2L), TimeUnit.SECONDS.toNanos(20L));
addFakeStatsData();
lrsClient = new LoadReportClient(loadStatsManager, channel, false, NODE, syncContext,
fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
lrsClient = new LoadReportClient(loadStatsManager, channel, Context.ROOT, false, NODE,
syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
fakeClock.getStopwatchSupplier());
syncContext.execute(new Runnable() {
@Override
Expand Down

0 comments on commit 85d1b9f

Please sign in to comment.