Skip to content

Commit

Permalink
Support setting gRPClb initial fallback timeout by service config
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhouyihaiDing committed Mar 10, 2022
1 parent bfb970c commit a4fe46c
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 28 deletions.
23 changes: 17 additions & 6 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbConfig.java
Expand Up @@ -16,6 +16,7 @@

package io.grpc.grpclb;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.MoreObjects;
Expand All @@ -28,24 +29,31 @@ final class GrpclbConfig {
private final Mode mode;
@Nullable
private final String serviceName;
private final long fallbackTimeoutMs;

private GrpclbConfig(Mode mode, @Nullable String serviceName) {
private GrpclbConfig(Mode mode, @Nullable String serviceName, long fallbackTimeoutMs) {
this.mode = checkNotNull(mode, "mode");
this.serviceName = serviceName;
this.fallbackTimeoutMs = fallbackTimeoutMs;
}

static GrpclbConfig create(Mode mode) {
return create(mode, null);
return create(mode, null, GrpclbState.FALLBACK_TIMEOUT_MS);
}

static GrpclbConfig create(Mode mode, @Nullable String serviceName) {
return new GrpclbConfig(mode, serviceName);
static GrpclbConfig create(Mode mode, @Nullable String serviceName, long fallbackTimeoutMs) {
checkArgument(fallbackTimeoutMs > 0, "Invalid timeout (%s)", fallbackTimeoutMs);
return new GrpclbConfig(mode, serviceName, fallbackTimeoutMs);
}

Mode getMode() {
return mode;
}

long getFallbackTimeoutMs() {
return fallbackTimeoutMs;
}

/**
* If specified, it overrides the name of the sevice name to be sent to the balancer. if not, the
* target to be sent to the balancer will continue to be obtained from the target URI passed
Expand All @@ -65,19 +73,22 @@ public boolean equals(Object o) {
return false;
}
GrpclbConfig that = (GrpclbConfig) o;
return mode == that.mode && Objects.equal(serviceName, that.serviceName);
return mode == that.mode
&& Objects.equal(serviceName, that.serviceName)
&& fallbackTimeoutMs == that.fallbackTimeoutMs;
}

@Override
public int hashCode() {
return Objects.hashCode(mode, serviceName);
return Objects.hashCode(mode, serviceName, fallbackTimeoutMs);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("mode", mode)
.add("serviceName", serviceName)
.add("fallbackTimeoutMs", fallbackTimeoutMs)
.toString();
}
}
Expand Up @@ -89,6 +89,13 @@ ConfigOrError parseLoadBalancingConfigPolicyInternal(
}
String serviceName = JsonUtil.getString(rawLoadBalancingPolicyConfig, "serviceName");
List<?> rawChildPolicies = JsonUtil.getList(rawLoadBalancingPolicyConfig, "childPolicy");
Long initialFallbackTimeoutNs =
JsonUtil.getStringAsDuration(rawLoadBalancingPolicyConfig, "initialFallbackTimeout");
long timeoutMs = GrpclbState.FALLBACK_TIMEOUT_MS;
if (initialFallbackTimeoutNs != null) {
timeoutMs = initialFallbackTimeoutNs / 1000000;
}

List<LbConfig> childPolicies = null;
if (rawChildPolicies != null) {
childPolicies =
Expand All @@ -97,17 +104,20 @@ ConfigOrError parseLoadBalancingConfigPolicyInternal(
}

if (childPolicies == null || childPolicies.isEmpty()) {
return ConfigOrError.fromConfig(GrpclbConfig.create(DEFAULT_MODE, serviceName));
return ConfigOrError.fromConfig(
GrpclbConfig.create(DEFAULT_MODE, serviceName, GrpclbState.FALLBACK_TIMEOUT_MS));
}

List<String> policiesTried = new ArrayList<>();
for (LbConfig childPolicy : childPolicies) {
String childPolicyName = childPolicy.getPolicyName();
switch (childPolicyName) {
case "round_robin":
return ConfigOrError.fromConfig(GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName));
return ConfigOrError.fromConfig(
GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName, timeoutMs));
case "pick_first":
return ConfigOrError.fromConfig(GrpclbConfig.create(Mode.PICK_FIRST, serviceName));
return ConfigOrError.fromConfig(
GrpclbConfig.create(Mode.PICK_FIRST, serviceName, timeoutMs));
default:
policiesTried.add(childPolicyName);
}
Expand Down
11 changes: 8 additions & 3 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Expand Up @@ -137,6 +137,7 @@ enum Mode {
}

private final String serviceName;
private final long fallbackTimeoutMs;
private final Helper helper;
private final Context context;
private final SynchronizationContext syncContext;
Expand Down Expand Up @@ -220,6 +221,7 @@ public void onSubchannelState(
} else {
this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
}
this.fallbackTimeoutMs = config.getFallbackTimeoutMs();
this.logger = checkNotNull(helper.getChannelLogger(), "logger");
logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Created", serviceName);
}
Expand Down Expand Up @@ -290,9 +292,12 @@ void handleAddresses(
// Start the fallback timer if it's never started and we are not already using fallback
// backends.
if (fallbackTimer == null && !usingFallbackBackends) {
fallbackTimer = syncContext.schedule(
new FallbackModeTask(BALANCER_TIMEOUT_STATUS), FALLBACK_TIMEOUT_MS,
TimeUnit.MILLISECONDS, timerService);
fallbackTimer =
syncContext.schedule(
new FallbackModeTask(BALANCER_TIMEOUT_STATUS),
fallbackTimeoutMs,
TimeUnit.MILLISECONDS,
timerService);
}
}
if (usingFallbackBackends) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.truth.Truth.assertThat;

import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.grpclb.GrpclbState.Mode;
import io.grpc.internal.JsonParser;
import java.util.Map;
Expand All @@ -41,6 +42,7 @@ public void retrieveModeFromLbConfig_pickFirst() throws Exception {
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.PICK_FIRST);
assertThat(config.getServiceName()).isNull();
assertThat(config.getFallbackTimeoutMs()).isEqualTo(GrpclbState.FALLBACK_TIMEOUT_MS);
}

@Test
Expand All @@ -54,6 +56,54 @@ public void retrieveModeFromLbConfig_roundRobin() throws Exception {
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.ROUND_ROBIN);
assertThat(config.getServiceName()).isNull();
assertThat(config.getFallbackTimeoutMs()).isEqualTo(GrpclbState.FALLBACK_TIMEOUT_MS);
}

@Test
public void setTimeoutToLbConfig() throws Exception {
String lbConfig =
"{\"initialFallbackTimeout\" : \"123s\", \"childPolicy\" : [{\"pick_first\" : {}},"
+ " {\"round_robin\" : {}}]}";

ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));

assertThat(configOrError.getConfig()).isNotNull();
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.PICK_FIRST);
assertThat(config.getServiceName()).isNull();
assertThat(config.getFallbackTimeoutMs()).isEqualTo(123000);
}

@Test
public void setInvalidTimeoutToLbConfig() throws Exception {
String lbConfig =
"{\"initialFallbackTimeout\" : \"-1s\", \"childPolicy\" : [{\"pick_first\" : {}},"
+ " {\"round_robin\" : {}}]}";

ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));

assertThat(configOrError.getConfig()).isNull();
assertThat(configOrError.getError()).isNotNull();
Status errorStatus = configOrError.getError();
assertThat(errorStatus.getCause()).hasMessageThat().isEqualTo("Invalid timeout (-1000)");
}

@Test
public void setInvalidTimeoutDurationProtoToLbConfig() throws Exception {
String lbConfig =
"{\"initialFallbackTimeout\" : \"1000\", \"childPolicy\" : [{\"pick_first\" : {}},"
+ " {\"round_robin\" : {}}]}";

ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));

assertThat(configOrError.getError()).isNotNull();
Status errorStatus = configOrError.getError();
assertThat(errorStatus.getCause())
.hasMessageThat()
.isEqualTo("java.text.ParseException: Invalid duration string: 1000");
}

@Test
Expand All @@ -65,6 +115,7 @@ public void retrieveModeFromLbConfig_nullConfigUseRoundRobin() throws Exception
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.ROUND_ROBIN);
assertThat(config.getServiceName()).isNull();
assertThat(config.getFallbackTimeoutMs()).isEqualTo(GrpclbState.FALLBACK_TIMEOUT_MS);
}

@Test
Expand All @@ -78,6 +129,7 @@ public void retrieveModeFromLbConfig_emptyConfigUseRoundRobin() throws Exception
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.ROUND_ROBIN);
assertThat(config.getServiceName()).isNull();
assertThat(config.getFallbackTimeoutMs()).isEqualTo(GrpclbState.FALLBACK_TIMEOUT_MS);
}

@Test
Expand All @@ -91,6 +143,7 @@ public void retrieveModeFromLbConfig_emptyChildPolicyUseRoundRobin() throws Exce
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.ROUND_ROBIN);
assertThat(config.getServiceName()).isNull();
assertThat(config.getFallbackTimeoutMs()).isEqualTo(GrpclbState.FALLBACK_TIMEOUT_MS);
}

@Test
Expand All @@ -117,6 +170,7 @@ public void retrieveModeFromLbConfig_skipUnsupportedChildPolicy() throws Excepti
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.PICK_FIRST);
assertThat(config.getServiceName()).isNull();
assertThat(config.getFallbackTimeoutMs()).isEqualTo(GrpclbState.FALLBACK_TIMEOUT_MS);
}

@Test
Expand All @@ -131,6 +185,7 @@ public void retrieveModeFromLbConfig_skipUnsupportedChildPolicyWithTarget() thro
GrpclbConfig config = (GrpclbConfig) configOrError.getConfig();
assertThat(config.getMode()).isEqualTo(Mode.PICK_FIRST);
assertThat(config.getServiceName()).isEqualTo("foo.google.com");
assertThat(config.getFallbackTimeoutMs()).isEqualTo(GrpclbState.FALLBACK_TIMEOUT_MS);
}

@Test
Expand Down
54 changes: 38 additions & 16 deletions grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
Expand Up @@ -1179,24 +1179,34 @@ public void roundRobinMode_subchannelStayTransientFailureUntilReady() {

@Test
public void grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires() {
subtestGrpclbFallbackInitialTimeout(false);
subtestGrpclbFallbackTimeout(false, GrpclbState.FALLBACK_TIMEOUT_MS);
}

@Test
public void grpclbFallback_initialTimeout_timerExpires() {
subtestGrpclbFallbackInitialTimeout(true);
subtestGrpclbFallbackTimeout(true, GrpclbState.FALLBACK_TIMEOUT_MS);
}

@Test
public void grpclbFallback_timeout_serverListReceivedBeforeTimerExpires() {
subtestGrpclbFallbackTimeout(false, 12345);
}

@Test
public void grpclbFallback_timeout_timerExpires() {
subtestGrpclbFallbackTimeout(true, 12345);
}

// Fallback or not within the period of the initial timeout.
private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) {
long loadReportIntervalMillis = 1983;
InOrder inOrder = inOrder(helper, subchannelPool);

// Create balancer and backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);

deliverResolvedAddresses(
backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout));
inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));

Expand All @@ -1220,7 +1230,7 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
inOrder.verifyNoMoreInteractions();

assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
fakeClock.forwardTime(timeout - 1, TimeUnit.MILLISECONDS);
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));

//////////////////////////////////
Expand All @@ -1246,7 +1256,10 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
// Name resolver sends new resolution results without any backend addr
//////////////////////////////////////////////////////////////////////
grpclbBalancerList = createResolvedBalancerAddresses(2);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(),grpclbBalancerList);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout));

// New addresses are updated to the OobChannel
inOrder.verify(helper).updateOobChannelAddresses(
Expand Down Expand Up @@ -1276,7 +1289,8 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
subchannelPool.clear();
backendList = createResolvedBackendAddresses(2);
grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
deliverResolvedAddresses(
backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout));

// New LB address is updated to the OobChannel
inOrder.verify(helper).updateOobChannelAddresses(
Expand Down Expand Up @@ -1326,7 +1340,8 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
///////////////////////////////////////////////////////////////
backendList = createResolvedBackendAddresses(1);
grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
deliverResolvedAddresses(
backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout));
// Will not affect the round robin list at all
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
Expand Down Expand Up @@ -2142,16 +2157,23 @@ private void subtestShutdownWithoutSubchannel(GrpclbConfig grpclbConfig) {
}

@Test
public void pickFirstMode_fallback() throws Exception {
public void pickFirstMode_defaultTimeout_fallback() throws Exception {
pickFirstModeFallback(GrpclbState.FALLBACK_TIMEOUT_MS);
}

@Test
public void pickFirstMode_serviceConfigTimeout_fallback() throws Exception {
pickFirstModeFallback(12345);
}

private void pickFirstModeFallback(long timeout) throws Exception {
InOrder inOrder = inOrder(helper);

// Name resolver returns balancer and backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
backendList,
grpclbBalancerList,
GrpclbConfig.create(Mode.PICK_FIRST));
backendList, grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST, null, timeout));

// Attempted to connect to balancer
assertEquals(1, fakeOobChannels.size());
Expand All @@ -2160,7 +2182,7 @@ public void pickFirstMode_fallback() throws Exception {
assertEquals(1, lbRequestObservers.size());

// Fallback timer expires with no response
fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
fakeClock.forwardTime(timeout, TimeUnit.MILLISECONDS);

// Entering fallback mode
inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
Expand Down Expand Up @@ -2401,7 +2423,7 @@ public void switchServiceName() throws Exception {
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName));
GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName, GrpclbState.FALLBACK_TIMEOUT_MS));

assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
Expand Down Expand Up @@ -2443,7 +2465,7 @@ public void switchServiceName() throws Exception {
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
newGrpclbResolutionList,
GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName));
GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName, GrpclbState.FALLBACK_TIMEOUT_MS));

// GrpclbState will be shutdown, and a new one will be created
assertThat(oobChannel.isShutdown()).isTrue();
Expand Down

0 comments on commit a4fe46c

Please sign in to comment.