Skip to content

Commit

Permalink
grpclb: support multiple authorities in lb backends for all SRV recor…
Browse files Browse the repository at this point in the history
…ds (#7951)
  • Loading branch information
YifeiZhuang committed Mar 11, 2021
1 parent 972fda2 commit 6a9c990
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 188 deletions.
20 changes: 17 additions & 3 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -930,11 +930,16 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
*
* @since 1.4.0
*/
// TODO(ejona): Allow passing a List<EAG> here and to updateOobChannelAddresses, but want to
// wait until https://github.com/grpc/grpc-java/issues/4469 is done.
// https://github.com/grpc/grpc-java/issues/4618
public abstract ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority);

/**
* Accept a list of EAG for multiple authorities: https://github.com/grpc/grpc-java/issues/4618
* */
public ManagedChannel createOobChannel(List<EquivalentAddressGroup> eag,
String authority) {
throw new UnsupportedOperationException();
}

/**
* Updates the addresses used for connections in the {@code Channel} that was created by {@link
* #createOobChannel(EquivalentAddressGroup, String)}. This is superior to {@link
Expand All @@ -949,6 +954,15 @@ public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressG
throw new UnsupportedOperationException();
}

/**
* Updates the addresses with a new EAG list. Connection is continued when old and new addresses
* overlap.
* */
public void updateOobChannelAddresses(ManagedChannel channel,
List<EquivalentAddressGroup> eag) {
throw new UnsupportedOperationException();
}

/**
* Creates an out-of-band channel for LoadBalancer's own RPC needs, e.g., talking to an external
* load-balancer service, that is specified by a target string. See the documentation on
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,12 @@ public void updateAddresses(final List<EquivalentAddressGroup> newAddressGroups)
Preconditions.checkNotNull(newAddressGroups, "newAddressGroups");
checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry");
Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty");
final List<EquivalentAddressGroup> newImmutableAddressGroups =
Collections.unmodifiableList(new ArrayList<>(newAddressGroups));

syncContext.execute(new Runnable() {
@Override
public void run() {
List<EquivalentAddressGroup> newImmutableAddressGroups =
Collections.unmodifiableList(new ArrayList<>(newAddressGroups));
ManagedClientTransport savedTransport = null;
SocketAddress previousAddress = addressIndex.getCurrentAddress();
addressIndex.updateGroups(newImmutableAddressGroups);
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,12 @@ public void run() {

@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
return createOobChannel(Collections.singletonList(addressGroup), authority);
}

@Override
public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
String authority) {
// TODO(ejona): can we be even stricter? Like terminating?
checkState(!terminated, "Channel is terminated");
long oobChannelCreationTime = timeProvider.currentTimeNanos();
Expand Down Expand Up @@ -1505,7 +1511,7 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
}

final InternalSubchannel internalSubchannel = new InternalSubchannel(
Collections.singletonList(addressGroup),
addressGroup,
authority, userAgent, backoffPolicyProvider, oobTransportFactory,
oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
// All callback methods are run from syncContext
Expand Down Expand Up @@ -1625,6 +1631,12 @@ public ChannelCredentials getUnsafeChannelCredentials() {

@Override
public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
updateOobChannelAddresses(channel, Collections.singletonList(eag));
}

@Override
public void updateOobChannelAddresses(ManagedChannel channel,
List<EquivalentAddressGroup> eag) {
checkArgument(channel instanceof OobChannel,
"channel must have been returned from createOobChannel");
((OobChannel) channel).updateAddresses(eag);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/grpc/internal/OobChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ public String toString() {
delayedTransport.reprocess(subchannelPicker);
}

void updateAddresses(EquivalentAddressGroup eag) {
subchannel.updateAddresses(Collections.singletonList(eag));
void updateAddresses(List<EquivalentAddressGroup> eag) {
subchannel.updateAddresses(eag);
}

@Override
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.grpc.NameResolver;
import io.grpc.NameResolverRegistry;
import io.grpc.SynchronizationContext;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
Expand All @@ -50,11 +51,21 @@ public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String autho
return delegate().createOobChannel(eag, authority);
}

@Override
public ManagedChannel createOobChannel(List<EquivalentAddressGroup> eag, String authority) {
return delegate().createOobChannel(eag, authority);
}

@Override
public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
delegate().updateOobChannelAddresses(channel, eag);
}

@Override
public void updateOobChannelAddresses(ManagedChannel channel, List<EquivalentAddressGroup> eag) {
delegate().updateOobChannelAddresses(channel, eag);
}

@Deprecated
@Override
public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public void oobTransportDoesNotAffectIdleness() {
assertFalse(channel.inUseStateAggregator.isInUse());

// Now make an RPC on an OOB channel
ManagedChannel oob = helper.createOobChannel(servers.get(0), "oobauthority");
ManagedChannel oob = helper.createOobChannel(servers, "oobauthority");
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class),
Expand Down Expand Up @@ -438,13 +438,13 @@ public void updateOobChannelAddresses_newAddressConnects() {
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
deliverResolutionResult();
Helper helper = helperCaptor.getValue();
ManagedChannel oobChannel = helper.createOobChannel(servers.get(0), "localhost");
ManagedChannel oobChannel = helper.createOobChannel(servers.subList(0,1), "localhost");

oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();

helper.updateOobChannelAddresses(oobChannel, servers.get(1));
helper.updateOobChannelAddresses(oobChannel, servers.subList(1,2));

oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
MockClientTransportInfo t1 = newTransports.poll();
Expand All @@ -462,15 +462,16 @@ public void updateOobChannelAddresses_existingAddressDoesNotConnect() {
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
deliverResolutionResult();
ManagedChannel oobChannel = helper.createOobChannel(servers.get(0), "localhost");
ManagedChannel oobChannel = helper.createOobChannel(servers.subList(0,1), "localhost");

oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();

List<SocketAddress> changedList = new ArrayList<>(servers.get(0).getAddresses());
changedList.add(new FakeSocketAddress("aDifferentServer"));
helper.updateOobChannelAddresses(oobChannel, new EquivalentAddressGroup(changedList));
helper.updateOobChannelAddresses(oobChannel, Collections.singletonList(
new EquivalentAddressGroup(changedList)));

oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
assertNull(newTransports.poll());
Expand Down
50 changes: 32 additions & 18 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,8 @@ public void channelzMembership_subchannel() throws Exception {
@Test
public void channelzMembership_oob() throws Exception {
createChannel();
OobChannel oob = (OobChannel) helper.createOobChannel(addressGroup, AUTHORITY);
OobChannel oob = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), AUTHORITY);
// oob channels are not root channels
assertNull(channelz.getRootChannel(oob.getLogId().getId()));
assertTrue(channelz.containsSubchannel(oob.getLogId()));
Expand Down Expand Up @@ -1621,8 +1622,10 @@ public void subchannelsNoConnectionShutdownNow() {
public void oobchannels() {
createChannel();

ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority");
ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority");
ManagedChannel oob1 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob1authority");
ManagedChannel oob2 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob2authority");
verify(balancerRpcExecutorPool, times(2)).getObject();

assertEquals("oob1authority", oob1.authority());
Expand Down Expand Up @@ -1755,7 +1758,8 @@ public void oobChannelHasNoChannelCallCredentials() {
.containsExactly(channelCredValue, callCredValue).inOrder();

// Verify that the oob channel does not
ManagedChannel oob = helper.createOobChannel(addressGroup, "oobauthority");
ManagedChannel oob = helper.createOobChannel(
Collections.singletonList(addressGroup), "oobauthority");

headers = new Metadata();
call = oob.newCall(method, callOptions);
Expand Down Expand Up @@ -1886,8 +1890,10 @@ public SwapChannelCredentialsResult answer(InvocationOnMock invocation) {
@Test
public void oobChannelsWhenChannelShutdownNow() {
createChannel();
ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
ManagedChannel oob1 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob1Authority");
ManagedChannel oob2 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob2Authority");

oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata());
Expand Down Expand Up @@ -1915,8 +1921,10 @@ public void oobChannelsWhenChannelShutdownNow() {
@Test
public void oobChannelsNoConnectionShutdown() {
createChannel();
ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
ManagedChannel oob1 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob1Authority");
ManagedChannel oob2 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob2Authority");
channel.shutdown();

verify(mockLoadBalancer).shutdown();
Expand All @@ -1934,8 +1942,8 @@ public void oobChannelsNoConnectionShutdown() {
@Test
public void oobChannelsNoConnectionShutdownNow() {
createChannel();
helper.createOobChannel(addressGroup, "oob1Authority");
helper.createOobChannel(addressGroup, "oob2Authority");
helper.createOobChannel(Collections.singletonList(addressGroup), "oob1Authority");
helper.createOobChannel(Collections.singletonList(addressGroup), "oob2Authority");
channel.shutdownNow();

verify(mockLoadBalancer).shutdown();
Expand Down Expand Up @@ -2116,7 +2124,8 @@ private void subtestNameResolutionRefreshWhenConnectionFailed(
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
if (isOobChannel) {
OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobAuthority");
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobAuthority");
oobChannel.getSubchannel().requestConnection();
} else {
Subchannel subchannel =
Expand Down Expand Up @@ -3183,7 +3192,8 @@ public void channelTracing_subchannelStateChangeEvent() throws Exception {
public void channelTracing_oobChannelStateChangeEvent() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority");
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "authority");
timer.forwardNanos(1234);
oobChannel.handleSubchannelStateChange(
ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
Expand All @@ -3199,21 +3209,22 @@ public void channelTracing_oobChannelCreationEvents() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
timer.forwardNanos(1234);
OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority");
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "authority");
assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Child OobChannel created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.setChannelRef(oobChannel)
.build());
assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("OobChannel for [[test-addr]/{}] created")
.setDescription("OobChannel for [[[test-addr]/{}]] created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
assertThat(getStats(oobChannel.getInternalSubchannel()).channelTrace.events).contains(
new ChannelTrace.Event.Builder()
.setDescription("Subchannel for [[test-addr]/{}] created")
.setDescription("Subchannel for [[[test-addr]/{}]] created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
Expand Down Expand Up @@ -3349,7 +3360,8 @@ private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Ex
ClientStream mockStream = mock(ClientStream.class);
createChannel();

OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobauthority");
AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel();
FakeClock callExecutor = new FakeClock();
CallOptions options =
Expand Down Expand Up @@ -3411,15 +3423,17 @@ public void channelsAndSubchannels_oob_instrumented_name() throws Exception {
createChannel();

String authority = "oobauthority";
OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, authority);
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), authority);
assertEquals(authority, getStats(oobChannel).target);
}

@Test
public void channelsAndSubchannels_oob_instrumented_state() throws Exception {
createChannel();

OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobauthority");
assertEquals(IDLE, getStats(oobChannel).state);

oobChannel.getSubchannel().requestConnection();
Expand Down
31 changes: 17 additions & 14 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,28 @@ class GrpclbLoadBalancer extends LoadBalancer {
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
Attributes attributes = resolvedAddresses.getAttributes();
List<EquivalentAddressGroup> newLbAddresses = attributes.get(GrpclbConstants.ATTR_LB_ADDRS);
if ((newLbAddresses == null || newLbAddresses.isEmpty())
&& resolvedAddresses.getAddresses().isEmpty()) {
if (newLbAddresses == null) {
newLbAddresses = Collections.emptyList();
}
if (newLbAddresses.isEmpty() && resolvedAddresses.getAddresses().isEmpty()) {
handleNameResolutionError(
Status.UNAVAILABLE.withDescription("No backend or balancer addresses found"));
return;
}
List<LbAddressGroup> newLbAddressGroups = new ArrayList<>();

if (newLbAddresses != null) {
for (EquivalentAddressGroup lbAddr : newLbAddresses) {
String lbAddrAuthority = lbAddr.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
if (lbAddrAuthority == null) {
throw new AssertionError(
"This is a bug: LB address " + lbAddr + " does not have an authority.");
}
newLbAddressGroups.add(new LbAddressGroup(lbAddr, lbAddrAuthority));
List<EquivalentAddressGroup> overrideAuthorityLbAddresses =
new ArrayList<>(newLbAddresses.size());
for (EquivalentAddressGroup lbAddr : newLbAddresses) {
String lbAddrAuthority = lbAddr.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
if (lbAddrAuthority == null) {
throw new AssertionError(
"This is a bug: LB address " + lbAddr + " does not have an authority.");
}
Attributes attrs = lbAddr.getAttributes().toBuilder()
.set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, lbAddrAuthority)
.build();
overrideAuthorityLbAddresses.add(new EquivalentAddressGroup(lbAddr.getAddresses(), attrs));
}

newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups);
List<EquivalentAddressGroup> newBackendServers =
Collections.unmodifiableList(resolvedAddresses.getAddresses());
GrpclbConfig newConfig = (GrpclbConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
Expand All @@ -106,7 +108,8 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.getChannelLogger().log(ChannelLogLevel.INFO, "Config: " + newConfig);
recreateStates();
}
grpclbState.handleAddresses(newLbAddressGroups, newBackendServers);
grpclbState.handleAddresses(Collections.unmodifiableList(overrideAuthorityLbAddresses),
newBackendServers);
}

@Override
Expand Down

0 comments on commit 6a9c990

Please sign in to comment.