Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpclb: include fallback reason in error status of failing to fallback #8035

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 53 additions & 16 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Expand Up @@ -95,8 +95,20 @@ final class GrpclbState {
static final Status NO_AVAILABLE_BACKENDS_STATUS =
Status.UNAVAILABLE.withDescription("LoadBalancer responded without any backends");
@VisibleForTesting
static final Status NO_FALLBACK_BACKENDS_FOUND_STATUS =
Status.UNAVAILABLE.withDescription("Unable to fallback, no fallback addresses found");
static final String NO_FALLBACK_BACKENDS_ERROR =
"Unable to fallback, no fallback addresses found";
@VisibleForTesting
static final Status BALANCER_TIMEOUT_STATUS =
Status.UNAVAILABLE.withDescription("Timeout waiting for remote balancer");
@VisibleForTesting
static final Status BALANCER_REQUESTED_FALLBACK_STATUS =
Status.UNAVAILABLE.withDescription("Fallback requested by balancer");
// This error status should never be propagated to RPC failures, as "no backend or balancer
// addresses found" should be directly handled as a name resolution error. So in cases of no
// balancer address, fallback should never fail.
private static final Status NO_LB_ADDRESS_PROVIDED_STATUS =
Status.UNAVAILABLE.withDescription("No balancer address found");


@VisibleForTesting
static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {
Expand Down Expand Up @@ -137,6 +149,10 @@ enum Mode {
private ScheduledHandle fallbackTimer;
private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList();
private boolean usingFallbackBackends;
// Reason to fallback, will be used as RPC's error message if fail to fallback (e.g., no
// fallback addresses found).
@Nullable
private Status fallbackReason;
// True if the current balancer has returned a serverlist. Will be reset to false when lost
// connection to a balancer.
private boolean balancerWorking;
Expand Down Expand Up @@ -239,7 +255,7 @@ void handleAddresses(
// No balancer address: close existing balancer connection and enter fallback mode
// immediately.
shutdownLbComm();
syncContext.execute(new FallbackModeTask());
syncContext.execute(new FallbackModeTask(NO_LB_ADDRESS_PROVIDED_STATUS));
} else {
startLbComm(newLbAddressGroups);
// Avoid creating a new RPC just because the addresses were updated, as it can cause a
Expand All @@ -253,7 +269,8 @@ void handleAddresses(
// Start the fallback timer if it's never started
if (fallbackTimer == null) {
fallbackTimer = syncContext.schedule(
new FallbackModeTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS, timerService);
new FallbackModeTask(BALANCER_TIMEOUT_STATUS), FALLBACK_TIMEOUT_MS,
TimeUnit.MILLISECONDS, timerService);
}
}
fallbackBackendList = newBackendServers;
Expand All @@ -275,16 +292,21 @@ void requestConnection() {
}

private void maybeUseFallbackBackends() {
if (balancerWorking) {
return;
}
if (usingFallbackBackends) {
if (balancerWorking || usingFallbackBackends) {
return;
}
// Balancer RPC should have either been broken or timed out.
checkState(fallbackReason != null, "no reason to fallback");
for (Subchannel subchannel : subchannels.values()) {
if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) {
ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
return;
}
// If we do have balancer-provided backends, use one of its error in the error message if
// fail to fallback.
if (stateInfo.getState() == TRANSIENT_FAILURE) {
fallbackReason = stateInfo.getStatus();
}
}
// Fallback conditions met
useFallbackBackends();
Expand Down Expand Up @@ -528,8 +550,17 @@ public void onSubchannelState(ConnectivityStateInfo newState) {

@VisibleForTesting
class FallbackModeTask implements Runnable {
private final Status reason;

private FallbackModeTask(Status reason) {
this.reason = reason;
}

@Override
public void run() {
// Timer should have been cancelled if entered fallback early.
checkState(!usingFallbackBackends, "already in fallback");
fallbackReason = reason;
maybeUseFallbackBackends();
maybeUpdatePicker();
}
Expand Down Expand Up @@ -658,7 +689,9 @@ private void handleResponse(LoadBalanceResponse response) {
}

if (typeCase == LoadBalanceResponseTypeCase.FALLBACK_RESPONSE) {
// Force entering fallback requested by balancer.
cancelFallbackTimer();
fallbackReason = BALANCER_REQUESTED_FALLBACK_STATUS;
useFallbackBackends();
maybeUpdatePicker();
return;
Expand Down Expand Up @@ -701,8 +734,9 @@ private void handleResponse(LoadBalanceResponse response) {
newBackendAddrList.add(new BackendAddressGroup(eag, token));
}
}
// Stop using fallback backends as soon as a new server list is received from the balancer.
// Exit fallback as soon as a new server list is received from the balancer.
usingFallbackBackends = false;
fallbackReason = null;
cancelFallbackTimer();
updateServerList(newDropList, newBackendAddrList, loadRecorder);
maybeUpdatePicker();
Expand All @@ -717,6 +751,8 @@ private void handleStreamClosed(Status error) {
cleanUp();
propagateError(error);
balancerWorking = false;
fallbackReason = error;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be UNAVAILABLE. We need to create a new Status.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the propagateError(error) two lines above? I was wanting to delete that line. That line fails RPCs for a short time window between balancer RPC closed and trying fallback. Right after fallback is attempted, if failing to fallback, RPCs will change to fail with fallbackReason (which is the same status for the balancer's failure plus a "fail to fallback" message).

So I am wondering if we should remove the propagateError(error) line here and fall RPCs with a single status, after attempting fallback.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

propagateError() is called two places. On of them isn't as it seems. InetAddress.getByAddress() only throws UnknownHostException "if IP address is of illegal length" so the error string "Host for server not found" is wrong.

propagateError() does two things: log and adjust the picker. For logging, we really want to log the original Status, so error here. But we can't use error directly for the picker, even if it is for a short period of time.

So I am wondering if we should remove the propagateError(error) line here and fall RPCs with a single status, after attempting fallback.

That's a functional change, as you no longer cause failures if fallback succeeds. I don't think we'd chose the behavior based on what makes the implementation easiest. I think we want it to behave a certain way in this case. I thought grpclb was supposed to try fallback before failing RPCs, at least when starting up. I honestly don't know where to look up the expected behavior in this case.

Calling @markdroth to help inform us of when gRPC-LB should begin failing RPCs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have enough context here to know which specific cases you're asking about.

In general, there are two types of grpclb fallback, fallback at startup and fallback after startup.

Fallback at startup is triggered in the following cases:

  • When the fallback timer fires before we have received the first response from the balancer.
  • When the balancer channel goes into TRANSIENT_FAILURE before reaching READY. (This short-circuits the fallback timer.)
  • When the balancer call finishes (regardless of status) without receiving the first response from the balancer. (This short-circuits the fallback timer.)

Fallback after startup occurs only after we receive an initial response from the balancer. It is triggered in the following cases:

  • When we get an explicit response from the balancer telling us go into fallback.
  • When both of the following are true:
    • The balancer call has finished (regardless of status) and we have not yet received the first response on the subsequent call.
    • We cannot connect to any of the backends in the last response we received from the balancer.

None of these cases have anything to do with the status of individual data plane calls. However, there are two cases above where fallback is triggered by receiving status on the balancer call, but only when other conditions are also met.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still did not directly answer the question if we should fail RPCs before trying fallback. The specific case we are talking about is when the balancer RPC finishes (regardless of status) and none of the connections to any backends received previously has been READY. Do we fail RPCs immediately while trying to use fallback addresses (which implies RPCs may succeed back again if connections to fallback succeeds)? Or do we wait until fallback has been attempted?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the fallback-at-startup case, we should be in state CONNECTING until we either get connected or go into fallback mode, so we should not fail data plane RPCs until one of those two things happens.

In the fallback-after-startup case, the "get an explicit response from the balancer telling us go into fallback" case should not depend on whether there are currently any READY connections to balancer-given backends, since it's intended to force clients to go to fallback regardless of whether they are currently connected to backends, and you should fix your implementation if it's not doing that. Given that, there are several cases here:

  • If we can't reach any of the balancer-provided backends before we go into fallback mode (e.g., if the backend connections fail before either the balancer connection fails or the balancer explicitly tells us to go into fallback), then we will fail some data plane RPCs.
  • If we are in contact with the balancer-provided backends and the balancer tells us to go into fallback mode, we should not fail any RPCs; we should keep using the balancer-provided backends while we get in contact with the fallback backends.
  • If we are in contact with the balancer-provided backends and the balancer call fails, and then we lose contact with the balancer-provided backends, it's a bit of a grey area. In principle, I suppose we should go into state CONNECTING here and queue data plane RPCs instead of failing them, but if we actually fail some RPCs instead, I think we can probably live with that.

@apolcyn may want to weigh in here as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to everything @markdroth just described.

Also note that go/grpclb-explicit-fallback describes the expected behavior of clients when receiving a fallback response from a balancer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That line fails RPCs for a short time window between balancer RPC closed and trying fallback.

I just realized that sounded similar to b/138458426. I had found a path through the code that could cause that but #6657 looked like it'd fix it. Maybe there was a second path through the code? And apparently Go might still have this problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the description of "the client enters transient failure because all subchannels are "connecting", and one has entered "transient failure", so the pending pick fails." in b/138458426#comment4, I'd suspect that was due to the issue described in #7959, which was fixed recently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the fallback-after-startup case, the "get an explicit response from the balancer telling us go into fallback" case should not depend on whether there are currently any READY connections to balancer-given backends, since it's intended to force clients to go to fallback regardless of whether they are currently connected to backends, and you should fix your implementation if it's not doing that.

Sorry sorry, what I mentioned in #8035 (comment) was wrong. Balancer forcing entering fallback is correct. It will stop using balancer-provided backends immediately, even if there are READY connections.

Actually our implementation looks fine for handling the grey area:

  • If connections to all balancer-provided backends fail before balancer RPC becomes broken. In this case, client RPCs fail with the status from one of the broken subchannels. After the balancer RPC fails, before attempting to fallback, the status used to fail client RPCs is changed to that from the balancer RPC.
  • If balancer RPC fail before connections to balancer-provided backends all become broken. Client RPCs do not fail until the later happens. After connections to all balancer-provided backends fail, before attempting to fallback, the status used to fail client RPCs is from one of the broken subchannels.

cancelFallbackTimer();
maybeUseFallbackBackends();
maybeUpdatePicker();

Expand Down Expand Up @@ -773,15 +809,16 @@ private void maybeUpdatePicker() {
List<RoundRobinEntry> pickList;
ConnectivityState state;
if (backendList.isEmpty()) {
if (balancerWorking) {
pickList =
Collections.<RoundRobinEntry>singletonList(
new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS));
// Note balancer (is working) may enforce using fallback backends, and that fallback may
// fail. So we should check if currently in fallback first.
if (usingFallbackBackends) {
pickList = Collections.<RoundRobinEntry>singletonList(new ErrorEntry(
fallbackReason.augmentDescription(NO_FALLBACK_BACKENDS_ERROR)));
state = TRANSIENT_FAILURE;
} else if (usingFallbackBackends) {
} else if (balancerWorking) {
pickList =
Collections.<RoundRobinEntry>singletonList(
new ErrorEntry(NO_FALLBACK_BACKENDS_FOUND_STATUS));
new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS));
state = TRANSIENT_FAILURE;
} else { // still waiting for balancer
pickList = Collections.singletonList(BUFFER_ENTRY);
Expand Down
142 changes: 137 additions & 5 deletions grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
Expand Up @@ -1284,11 +1284,16 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
for (Subchannel subchannel : mockSubchannels) {
verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
}

// RPC error status includes message of balancer RPC timeout
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).isEmpty();
assertThat(picker.pickList)
.containsExactly(new ErrorEntry(GrpclbState.NO_FALLBACK_BACKENDS_FOUND_STATUS));
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode())
.isEqualTo(GrpclbState.BALANCER_TIMEOUT_STATUS.getCode());
assertThat(result.getStatus().getDescription())
.startsWith(GrpclbState.BALANCER_TIMEOUT_STATUS.getDescription());
assertThat(result.getStatus().getDescription())
.contains(GrpclbState.NO_FALLBACK_BACKENDS_ERROR);
}

////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1396,6 +1401,9 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() {
Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
lbResponseObserver.onError(streamError.asException());

// Fallback time has been short-circuited
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));

// Fall back to the backends from resolver
fallbackTestVerifyUseOfFallbackBackendLists(
inOrder, Arrays.asList(backendList.get(0), backendList.get(1)));
Expand All @@ -1408,6 +1416,24 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() {
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));

//////////////////////////////////////////////////////////////////////
// Name resolver sends new resolution results without any backend addr
//////////////////////////////////////////////////////////////////////
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);

// Still in fallback logic, except that the backend list is empty
for (Subchannel subchannel : mockSubchannels) {
verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
}

// RPC error status includes error of balancer stream
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(streamError.getCode());
assertThat(result.getStatus().getDescription()).startsWith(streamError.getDescription());
assertThat(result.getStatus().getDescription())
.contains(GrpclbState.NO_FALLBACK_BACKENDS_ERROR);
}

@Test
Expand All @@ -1434,6 +1460,24 @@ public void grpclbFallback_noBalancerAddress() {
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
verify(helper, never())
.createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>anyList(), anyString());
logs.clear();

///////////////////////////////////////////////////////////////////////////////////////
// Name resolver sends new resolution results without any backend addr or balancer addr
///////////////////////////////////////////////////////////////////////////////////////
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(),
Collections.<EquivalentAddressGroup>emptyList());
assertThat(logs).containsExactly(
"DEBUG: [grpclb-<api.google.com>] Error: Status{code=UNAVAILABLE, "
+ "description=No backend or balancer addresses found, cause=null}");

// Keep using existing fallback addresses without interruption
for (Subchannel subchannel : mockSubchannels) {
verify(subchannelPool, never())
.returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
}
verify(helper, never())
.updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
}

@Test
Expand Down Expand Up @@ -1531,6 +1575,7 @@ private void subtestGrpclbFallbackConnectionLost(
}
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));

// No subchannel to fallback backends should have been created if no fallback happened
if (!(balancerBroken && allSubchannelsBroken)) {
verify(subchannelPool, never()).takeOrCreateSubchannel(
eq(backendList.get(0)), any(Attributes.class));
Expand All @@ -1539,6 +1584,72 @@ private void subtestGrpclbFallbackConnectionLost(
}
}

@Test
public void grpclbFallback_allLost_failToFallback() {
long loadReportIntervalMillis = 1983;
InOrder inOrder = inOrder(helper, mockLbService, subchannelPool);

// Create balancer and (empty) backend addresses
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);

inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));

// Attempted to connect to balancer
assertEquals(1, fakeOobChannels.size());
fakeOobChannels.poll();
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();

verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
// We don't care if these methods have been run.
inOrder.verify(helper, atLeast(0)).getSynchronizationContext();
inOrder.verify(helper, atLeast(0)).getScheduledExecutorService();

inOrder.verifyNoMoreInteractions();

// Balancer returns a server list
List<ServerEntry> serverList = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(serverList));

List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList);

// Break connections
lbResponseObserver.onError(Status.UNAVAILABLE.asException());
// A new stream to LB is created
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
assertEquals(1, lbRequestObservers.size());

// Break all subchannel connections
Status error = Status.UNAUTHENTICATED.withDescription("Permission denied");
for (Subchannel subchannel : subchannels) {
deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error));
}

// Recycle all subchannels
for (Subchannel subchannel : subchannels) {
verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
}

// RPC error status includes errors of subchannels to balancer-provided backends
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(error.getCode());
assertThat(result.getStatus().getDescription()).startsWith(error.getDescription());
assertThat(result.getStatus().getDescription())
.contains(GrpclbState.NO_FALLBACK_BACKENDS_ERROR);
}

private List<Subchannel> fallbackTestVerifyUseOfFallbackBackendLists(
InOrder inOrder, List<EquivalentAddressGroup> addrs) {
return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null);
Expand Down Expand Up @@ -1958,6 +2069,7 @@ public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception {
assertThat(mockSubchannels).isEmpty();
verify(subchannel).shutdown();

// RPC error status includes message of no backends provided by balancer
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(errorPicker.pickList)
Expand Down Expand Up @@ -2445,7 +2557,7 @@ public void grpclbWorking_lbSendsFallbackMessage() {
new BackendEntry(subchannel2, getLoadRecorder(), "token0002"))
.inOrder();

// enter fallback mode
// Balancer forces entering fallback mode
lbResponseObserver.onNext(buildLbFallbackResponse());

// existing subchannels must be returned immediately to gracefully shutdown.
Expand All @@ -2460,6 +2572,26 @@ public void grpclbWorking_lbSendsFallbackMessage() {
assertFalse(oobChannel.isShutdown());
verify(lbRequestObserver, never()).onCompleted();

//////////////////////////////////////////////////////////////////////
// Name resolver sends new resolution results without any backend addr
//////////////////////////////////////////////////////////////////////
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);

// Still in fallback logic, except that the backend list is empty
for (Subchannel subchannel : mockSubchannels) {
verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
}

// RPC error status includes message of fallback requested by balancer
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode())
.isEqualTo(GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS.getCode());
assertThat(result.getStatus().getDescription())
.startsWith(GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS.getDescription());
assertThat(result.getStatus().getDescription())
.contains(GrpclbState.NO_FALLBACK_BACKENDS_ERROR);

// exit fall back by providing two new backends
ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001");
ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002");
Expand Down