Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: implement Helper#createResolvingOobChannel #6923

Merged
merged 1 commit into from Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 3 additions & 5 deletions api/src/main/java/io/grpc/LoadBalancer.java
Expand Up @@ -360,7 +360,7 @@ public boolean equals(Object obj) {
@Deprecated
public void handleSubchannelState(
Subchannel subchannel, ConnectivityStateInfo stateInfo) {
// Do nothing. If the implemetation doesn't implement this, it will get subchannel states from
// Do nothing. If the implementation doesn't implement this, it will get subchannel states from
// the new API. We don't throw because there may be forwarding LoadBalancers still plumb this.
}

Expand Down Expand Up @@ -507,7 +507,7 @@ private PickResult(
* A decision to proceed the RPC on a Subchannel.
*
* <p>The Subchannel should either be an original Subchannel returned by {@link
* Helper#createSubchannel Helper.createSubchannel()}, or a wrapper of it preferrably based on
* Helper#createSubchannel Helper.createSubchannel()}, or a wrapper of it preferably based on
* {@code ForwardingSubchannel}. At the very least its {@link Subchannel#getInternalSubchannel
* getInternalSubchannel()} must return the same object as the one returned by the original.
* Otherwise the Channel cannot use it for the RPC.
Expand Down Expand Up @@ -1024,7 +1024,7 @@ public void updateSubchannelAddresses(

/**
* Updates the addresses used for connections in the {@code Channel} that was created by {@link
* #createOobChannel(EquivalentAddressGroup, String)}. This is supperior to {@link
* #createOobChannel(EquivalentAddressGroup, String)}. This is superior to {@link
* #createOobChannel(EquivalentAddressGroup, String)} when the old and new addresses overlap,
* since the channel can continue using an existing connection.
*
Expand All @@ -1048,8 +1048,6 @@ public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressG
* <p>The LoadBalancer is responsible for closing unused OOB channels, and closing all OOB
* channels within {@link #shutdown}.
*
* <P>NOT IMPLEMENTED: this method is currently a stub and not yet implemented by gRPC.
*
* @since 1.20.0
*/
public ManagedChannel createResolvingOobChannel(String target) {
Expand Down
43 changes: 43 additions & 0 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Expand Up @@ -1259,6 +1259,49 @@ public void run() {
return oobChannel;
}

@Override
public ManagedChannel createResolvingOobChannel(String target) {
final class ResolvingOobChannelBuilder
extends AbstractManagedChannelImplBuilder<ResolvingOobChannelBuilder> {
int defaultPort = -1;

ResolvingOobChannelBuilder(String target) {
super(target);
}

@Override
public int getDefaultPort() {
return defaultPort;
}

@Override
protected ClientTransportFactory buildTransportFactory() {
throw new UnsupportedOperationException();
}
}

checkState(!terminated, "Channel is terminated");

ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder(target);
builder.offloadExecutorPool = offloadExecutorHolder.pool;
builder.overrideAuthority(getAuthority());
builder.nameResolverFactory(nameResolverFactory);
builder.executorPool = executorPool;
builder.maxTraceEvents = maxTraceEvents;
builder.proxyDetector = nameResolverArgs.getProxyDetector();
builder.defaultPort = nameResolverArgs.getDefaultPort();
builder.userAgent = userAgent;
return
new ManagedChannelImpl(
builder,
transportFactory,
backoffPolicyProvider,
balancerRpcExecutorPool,
stopwatchSupplier,
Collections.<ClientInterceptor>emptyList(),
timeProvider);
}

@Override
public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
checkArgument(channel instanceof OobChannel,
Expand Down
47 changes: 36 additions & 11 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
Expand Down Expand Up @@ -3948,6 +3949,28 @@ public void healthCheckingConfigPropagated() throws Exception {
}
}

@Test
public void createResolvingOobChannel() throws Exception {
String oobTarget = "fake://second.example.com";
URI oobUri = new URI(oobTarget);
channelBuilder
.nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri, oobUri).build());
createChannel();

ManagedChannel resolvedOobChannel = null;
try {
resolvedOobChannel = helper.createResolvingOobChannel(oobTarget);

assertWithMessage("resolving oob channel should have same authority")
.that(resolvedOobChannel.authority())
.isEqualTo(channel.authority());
} finally {
if (resolvedOobChannel != null) {
resolvedOobChannel.shutdownNow();
}
}
}

private static final class ChannelBuilder
extends AbstractManagedChannelImplBuilder<ChannelBuilder> {

Expand Down Expand Up @@ -3979,32 +4002,32 @@ public long nextBackoffNanos() {
}

private static final class FakeNameResolverFactory extends NameResolver.Factory {
final URI expectedUri;
final List<URI> expectedUris;
final List<EquivalentAddressGroup> servers;
final boolean resolvedAtStart;
final Status error;
final ArrayList<FakeNameResolverFactory.FakeNameResolver> resolvers = new ArrayList<>();
final AtomicReference<ConfigOrError> nextConfigOrError = new AtomicReference<>();

FakeNameResolverFactory(
URI expectedUri,
List<URI> expectedUris,
List<EquivalentAddressGroup> servers,
boolean resolvedAtStart,
Status error) {
this.expectedUri = expectedUri;
this.expectedUris = expectedUris;
this.servers = servers;
this.resolvedAtStart = resolvedAtStart;
this.error = error;
}

@Override
public NameResolver newNameResolver(final URI targetUri, NameResolver.Args args) {
if (!expectedUri.equals(targetUri)) {
if (!expectedUris.contains(targetUri)) {
return null;
}
assertEquals(DEFAULT_PORT, args.getDefaultPort());
FakeNameResolverFactory.FakeNameResolver resolver =
new FakeNameResolverFactory.FakeNameResolver(error);
new FakeNameResolverFactory.FakeNameResolver(targetUri, error);
resolvers.add(resolver);
return resolver;
}
Expand All @@ -4021,17 +4044,19 @@ void allResolved() {
}

final class FakeNameResolver extends NameResolver {
final URI targetUri;
Listener2 listener;
boolean shutdown;
int refreshCalled;
Status error;

FakeNameResolver(Status error) {
FakeNameResolver(URI targetUri, Status error) {
this.targetUri = targetUri;
this.error = error;
}

@Override public String getServiceAuthority() {
return expectedUri.getAuthority();
return targetUri.getAuthority();
}

@Override public void start(Listener2 listener) {
Expand Down Expand Up @@ -4072,13 +4097,13 @@ public String toString() {
}

static final class Builder {
final URI expectedUri;
List<URI> expectedUris;
List<EquivalentAddressGroup> servers = ImmutableList.of();
boolean resolvedAtStart = true;
Status error = null;

Builder(URI expectedUri) {
this.expectedUri = expectedUri;
Builder(URI... expectedUris) {
this.expectedUris = Collections.unmodifiableList(Arrays.asList(expectedUris));
}

FakeNameResolverFactory.Builder setServers(List<EquivalentAddressGroup> servers) {
Expand All @@ -4097,7 +4122,7 @@ FakeNameResolverFactory.Builder setError(Status error) {
}

FakeNameResolverFactory build() {
return new FakeNameResolverFactory(expectedUri, servers, resolvedAtStart, error);
return new FakeNameResolverFactory(expectedUris, servers, resolvedAtStart, error);
}
}
}
Expand Down