Skip to content

Commit

Permalink
core: implement Helper#createResolvingOobChannel (#6923)
Browse files Browse the repository at this point in the history
  • Loading branch information
creamsoup committed Apr 17, 2020
1 parent 5803dfd commit 68297d6
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 16 deletions.
8 changes: 3 additions & 5 deletions api/src/main/java/io/grpc/LoadBalancer.java
Expand Up @@ -360,7 +360,7 @@ public boolean equals(Object obj) {
@Deprecated
public void handleSubchannelState(
Subchannel subchannel, ConnectivityStateInfo stateInfo) {
// Do nothing. If the implemetation doesn't implement this, it will get subchannel states from
// Do nothing. If the implementation doesn't implement this, it will get subchannel states from
// the new API. We don't throw because there may be forwarding LoadBalancers still plumb this.
}

Expand Down Expand Up @@ -507,7 +507,7 @@ private PickResult(
* A decision to proceed the RPC on a Subchannel.
*
* <p>The Subchannel should either be an original Subchannel returned by {@link
* Helper#createSubchannel Helper.createSubchannel()}, or a wrapper of it preferrably based on
* Helper#createSubchannel Helper.createSubchannel()}, or a wrapper of it preferably based on
* {@code ForwardingSubchannel}. At the very least its {@link Subchannel#getInternalSubchannel
* getInternalSubchannel()} must return the same object as the one returned by the original.
* Otherwise the Channel cannot use it for the RPC.
Expand Down Expand Up @@ -1024,7 +1024,7 @@ public void updateSubchannelAddresses(

/**
* Updates the addresses used for connections in the {@code Channel} that was created by {@link
* #createOobChannel(EquivalentAddressGroup, String)}. This is supperior to {@link
* #createOobChannel(EquivalentAddressGroup, String)}. This is superior to {@link
* #createOobChannel(EquivalentAddressGroup, String)} when the old and new addresses overlap,
* since the channel can continue using an existing connection.
*
Expand All @@ -1048,8 +1048,6 @@ public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressG
* <p>The LoadBalancer is responsible for closing unused OOB channels, and closing all OOB
* channels within {@link #shutdown}.
*
* <P>NOT IMPLEMENTED: this method is currently a stub and not yet implemented by gRPC.
*
* @since 1.20.0
*/
public ManagedChannel createResolvingOobChannel(String target) {
Expand Down
43 changes: 43 additions & 0 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Expand Up @@ -1259,6 +1259,49 @@ public void run() {
return oobChannel;
}

@Override
public ManagedChannel createResolvingOobChannel(String target) {
final class ResolvingOobChannelBuilder
extends AbstractManagedChannelImplBuilder<ResolvingOobChannelBuilder> {
int defaultPort = -1;

ResolvingOobChannelBuilder(String target) {
super(target);
}

@Override
public int getDefaultPort() {
return defaultPort;
}

@Override
protected ClientTransportFactory buildTransportFactory() {
throw new UnsupportedOperationException();
}
}

checkState(!terminated, "Channel is terminated");

ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder(target);
builder.offloadExecutorPool = offloadExecutorHolder.pool;
builder.overrideAuthority(getAuthority());
builder.nameResolverFactory(nameResolverFactory);
builder.executorPool = executorPool;
builder.maxTraceEvents = maxTraceEvents;
builder.proxyDetector = nameResolverArgs.getProxyDetector();
builder.defaultPort = nameResolverArgs.getDefaultPort();
builder.userAgent = userAgent;
return
new ManagedChannelImpl(
builder,
transportFactory,
backoffPolicyProvider,
balancerRpcExecutorPool,
stopwatchSupplier,
Collections.<ClientInterceptor>emptyList(),
timeProvider);
}

@Override
public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
checkArgument(channel instanceof OobChannel,
Expand Down
47 changes: 36 additions & 11 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
Expand Down Expand Up @@ -3948,6 +3949,28 @@ public void healthCheckingConfigPropagated() throws Exception {
}
}

@Test
public void createResolvingOobChannel() throws Exception {
String oobTarget = "fake://second.example.com";
URI oobUri = new URI(oobTarget);
channelBuilder
.nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri, oobUri).build());
createChannel();

ManagedChannel resolvedOobChannel = null;
try {
resolvedOobChannel = helper.createResolvingOobChannel(oobTarget);

assertWithMessage("resolving oob channel should have same authority")
.that(resolvedOobChannel.authority())
.isEqualTo(channel.authority());
} finally {
if (resolvedOobChannel != null) {
resolvedOobChannel.shutdownNow();
}
}
}

private static final class ChannelBuilder
extends AbstractManagedChannelImplBuilder<ChannelBuilder> {

Expand Down Expand Up @@ -3979,32 +4002,32 @@ public long nextBackoffNanos() {
}

private static final class FakeNameResolverFactory extends NameResolver.Factory {
final URI expectedUri;
final List<URI> expectedUris;
final List<EquivalentAddressGroup> servers;
final boolean resolvedAtStart;
final Status error;
final ArrayList<FakeNameResolverFactory.FakeNameResolver> resolvers = new ArrayList<>();
final AtomicReference<ConfigOrError> nextConfigOrError = new AtomicReference<>();

FakeNameResolverFactory(
URI expectedUri,
List<URI> expectedUris,
List<EquivalentAddressGroup> servers,
boolean resolvedAtStart,
Status error) {
this.expectedUri = expectedUri;
this.expectedUris = expectedUris;
this.servers = servers;
this.resolvedAtStart = resolvedAtStart;
this.error = error;
}

@Override
public NameResolver newNameResolver(final URI targetUri, NameResolver.Args args) {
if (!expectedUri.equals(targetUri)) {
if (!expectedUris.contains(targetUri)) {
return null;
}
assertEquals(DEFAULT_PORT, args.getDefaultPort());
FakeNameResolverFactory.FakeNameResolver resolver =
new FakeNameResolverFactory.FakeNameResolver(error);
new FakeNameResolverFactory.FakeNameResolver(targetUri, error);
resolvers.add(resolver);
return resolver;
}
Expand All @@ -4021,17 +4044,19 @@ void allResolved() {
}

final class FakeNameResolver extends NameResolver {
final URI targetUri;
Listener2 listener;
boolean shutdown;
int refreshCalled;
Status error;

FakeNameResolver(Status error) {
FakeNameResolver(URI targetUri, Status error) {
this.targetUri = targetUri;
this.error = error;
}

@Override public String getServiceAuthority() {
return expectedUri.getAuthority();
return targetUri.getAuthority();
}

@Override public void start(Listener2 listener) {
Expand Down Expand Up @@ -4072,13 +4097,13 @@ public String toString() {
}

static final class Builder {
final URI expectedUri;
List<URI> expectedUris;
List<EquivalentAddressGroup> servers = ImmutableList.of();
boolean resolvedAtStart = true;
Status error = null;

Builder(URI expectedUri) {
this.expectedUri = expectedUri;
Builder(URI... expectedUris) {
this.expectedUris = Collections.unmodifiableList(Arrays.asList(expectedUris));
}

FakeNameResolverFactory.Builder setServers(List<EquivalentAddressGroup> servers) {
Expand All @@ -4097,7 +4122,7 @@ FakeNameResolverFactory.Builder setError(Status error) {
}

FakeNameResolverFactory build() {
return new FakeNameResolverFactory(expectedUri, servers, resolvedAtStart, error);
return new FakeNameResolverFactory(expectedUris, servers, resolvedAtStart, error);
}
}
}
Expand Down

0 comments on commit 68297d6

Please sign in to comment.