From 3462889c876e8f036dbc884c99684f69ce2dbaba Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Fri, 17 Apr 2020 16:07:38 -0700 Subject: [PATCH] core: implement Helper#createResolvingOobChannel (#6923) --- api/src/main/java/io/grpc/LoadBalancer.java | 8 ++-- .../io/grpc/internal/ManagedChannelImpl.java | 43 +++++++++++++++++ .../grpc/internal/ManagedChannelImplTest.java | 47 ++++++++++++++----- 3 files changed, 82 insertions(+), 16 deletions(-) diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 5682286c7769..d8c7713568df 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -360,7 +360,7 @@ public boolean equals(Object obj) { @Deprecated public void handleSubchannelState( Subchannel subchannel, ConnectivityStateInfo stateInfo) { - // Do nothing. If the implemetation doesn't implement this, it will get subchannel states from + // Do nothing. If the implementation doesn't implement this, it will get subchannel states from // the new API. We don't throw because there may be forwarding LoadBalancers still plumb this. } @@ -507,7 +507,7 @@ private PickResult( * A decision to proceed the RPC on a Subchannel. * *

The Subchannel should either be an original Subchannel returned by {@link - * Helper#createSubchannel Helper.createSubchannel()}, or a wrapper of it preferrably based on + * Helper#createSubchannel Helper.createSubchannel()}, or a wrapper of it preferably based on * {@code ForwardingSubchannel}. At the very least its {@link Subchannel#getInternalSubchannel * getInternalSubchannel()} must return the same object as the one returned by the original. * Otherwise the Channel cannot use it for the RPC. @@ -1024,7 +1024,7 @@ public void updateSubchannelAddresses( /** * Updates the addresses used for connections in the {@code Channel} that was created by {@link - * #createOobChannel(EquivalentAddressGroup, String)}. This is supperior to {@link + * #createOobChannel(EquivalentAddressGroup, String)}. This is superior to {@link * #createOobChannel(EquivalentAddressGroup, String)} when the old and new addresses overlap, * since the channel can continue using an existing connection. * @@ -1048,8 +1048,6 @@ public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressG *

The LoadBalancer is responsible for closing unused OOB channels, and closing all OOB * channels within {@link #shutdown}. * - *

NOT IMPLEMENTED: this method is currently a stub and not yet implemented by gRPC. - * * @since 1.20.0 */ public ManagedChannel createResolvingOobChannel(String target) { diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 55f73db81d4f..5b164c14ead9 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -1259,6 +1259,49 @@ public void run() { return oobChannel; } + @Override + public ManagedChannel createResolvingOobChannel(String target) { + final class ResolvingOobChannelBuilder + extends AbstractManagedChannelImplBuilder { + int defaultPort = -1; + + ResolvingOobChannelBuilder(String target) { + super(target); + } + + @Override + public int getDefaultPort() { + return defaultPort; + } + + @Override + protected ClientTransportFactory buildTransportFactory() { + throw new UnsupportedOperationException(); + } + } + + checkState(!terminated, "Channel is terminated"); + + ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder(target); + builder.offloadExecutorPool = offloadExecutorHolder.pool; + builder.overrideAuthority(getAuthority()); + builder.nameResolverFactory(nameResolverFactory); + builder.executorPool = executorPool; + builder.maxTraceEvents = maxTraceEvents; + builder.proxyDetector = nameResolverArgs.getProxyDetector(); + builder.defaultPort = nameResolverArgs.getDefaultPort(); + builder.userAgent = userAgent; + return + new ManagedChannelImpl( + builder, + transportFactory, + backoffPolicyProvider, + balancerRpcExecutorPool, + stopwatchSupplier, + Collections.emptyList(), + timeProvider); + } + @Override public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) { checkArgument(channel instanceof OobChannel, diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 89e5be89a4e1..fb65dc45d92b 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; @@ -3948,6 +3949,28 @@ public void healthCheckingConfigPropagated() throws Exception { } } + @Test + public void createResolvingOobChannel() throws Exception { + String oobTarget = "fake://second.example.com"; + URI oobUri = new URI(oobTarget); + channelBuilder + .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri, oobUri).build()); + createChannel(); + + ManagedChannel resolvedOobChannel = null; + try { + resolvedOobChannel = helper.createResolvingOobChannel(oobTarget); + + assertWithMessage("resolving oob channel should have same authority") + .that(resolvedOobChannel.authority()) + .isEqualTo(channel.authority()); + } finally { + if (resolvedOobChannel != null) { + resolvedOobChannel.shutdownNow(); + } + } + } + private static final class ChannelBuilder extends AbstractManagedChannelImplBuilder { @@ -3979,7 +4002,7 @@ public long nextBackoffNanos() { } private static final class FakeNameResolverFactory extends NameResolver.Factory { - final URI expectedUri; + final List expectedUris; final List servers; final boolean resolvedAtStart; final Status error; @@ -3987,11 +4010,11 @@ private static final class FakeNameResolverFactory extends NameResolver.Factory final AtomicReference nextConfigOrError = new AtomicReference<>(); FakeNameResolverFactory( - URI expectedUri, + List expectedUris, List servers, boolean resolvedAtStart, Status error) { - this.expectedUri = expectedUri; + this.expectedUris = expectedUris; this.servers = servers; this.resolvedAtStart = resolvedAtStart; this.error = error; @@ -3999,12 +4022,12 @@ private static final class FakeNameResolverFactory extends NameResolver.Factory @Override public NameResolver newNameResolver(final URI targetUri, NameResolver.Args args) { - if (!expectedUri.equals(targetUri)) { + if (!expectedUris.contains(targetUri)) { return null; } assertEquals(DEFAULT_PORT, args.getDefaultPort()); FakeNameResolverFactory.FakeNameResolver resolver = - new FakeNameResolverFactory.FakeNameResolver(error); + new FakeNameResolverFactory.FakeNameResolver(targetUri, error); resolvers.add(resolver); return resolver; } @@ -4021,17 +4044,19 @@ void allResolved() { } final class FakeNameResolver extends NameResolver { + final URI targetUri; Listener2 listener; boolean shutdown; int refreshCalled; Status error; - FakeNameResolver(Status error) { + FakeNameResolver(URI targetUri, Status error) { + this.targetUri = targetUri; this.error = error; } @Override public String getServiceAuthority() { - return expectedUri.getAuthority(); + return targetUri.getAuthority(); } @Override public void start(Listener2 listener) { @@ -4072,13 +4097,13 @@ public String toString() { } static final class Builder { - final URI expectedUri; + List expectedUris; List servers = ImmutableList.of(); boolean resolvedAtStart = true; Status error = null; - Builder(URI expectedUri) { - this.expectedUri = expectedUri; + Builder(URI... expectedUris) { + this.expectedUris = Collections.unmodifiableList(Arrays.asList(expectedUris)); } FakeNameResolverFactory.Builder setServers(List servers) { @@ -4097,7 +4122,7 @@ FakeNameResolverFactory.Builder setError(Status error) { } FakeNameResolverFactory build() { - return new FakeNameResolverFactory(expectedUri, servers, resolvedAtStart, error); + return new FakeNameResolverFactory(expectedUris, servers, resolvedAtStart, error); } } }