Skip to content

Commit

Permalink
Enhance error information reflected by RPC status when failing to fal…
Browse files Browse the repository at this point in the history
…lback (aka, no fallback addresses provided by resolver), by including the original cause of entering fallback. This falls into cases:

  - balancer RPC timeout (includes a timeout message)
  - balancer RPC failed before receiving any backend addresses (use the error occured in balancer RPC)
  - all balancer-provided addresses failed, while balancer RPC had failed causing fallback (use the error status for one of the balancer-provided backend)
  • Loading branch information
voidzcy committed Apr 1, 2021
1 parent 9fc32f1 commit 51142fc
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 19 deletions.
63 changes: 49 additions & 14 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Expand Up @@ -95,8 +95,20 @@ final class GrpclbState {
static final Status NO_AVAILABLE_BACKENDS_STATUS =
Status.UNAVAILABLE.withDescription("LoadBalancer responded without any backends");
@VisibleForTesting
static final Status NO_FALLBACK_BACKENDS_FOUND_STATUS =
Status.UNAVAILABLE.withDescription("Unable to fallback, no fallback addresses found");
static final String NO_FALLBACK_BACKENDS_ERROR =
"Unable to fallback, no fallback addresses found";
@VisibleForTesting
static final Status BALANCER_TIMEOUT_STATUS =
Status.UNAVAILABLE.withDescription("Timeout waiting for remote balancer");
@VisibleForTesting
static final Status BALANCER_REQUESTED_FALLBACK_STATUS =
Status.UNAVAILABLE.withDescription("Fallback requested by balancer");
// This error status should never be propagated to RPC failures, as "no backend or balancer
// addresses found" should be directly handled as a name resolution error. So in cases of no
// balancer address, fallback should never fail.
private static final Status NO_LB_ADDRESS_PROVIDED_STATUS =
Status.UNAVAILABLE.withDescription("No balancer address found");


@VisibleForTesting
static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {
Expand Down Expand Up @@ -137,6 +149,10 @@ enum Mode {
private ScheduledHandle fallbackTimer;
private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList();
private boolean usingFallbackBackends;
// Reason to fallback, will be used as RPC's error message if fail to fallback (e.g., no
// fallback addresses found).
@Nullable
private Status fallbackReason;
// True if the current balancer has returned a serverlist. Will be reset to false when lost
// connection to a balancer.
private boolean balancerWorking;
Expand Down Expand Up @@ -239,6 +255,7 @@ void handleAddresses(
// No balancer address: close existing balancer connection and enter fallback mode
// immediately.
shutdownLbComm();
fallbackReason = NO_LB_ADDRESS_PROVIDED_STATUS;
syncContext.execute(new FallbackModeTask());
} else {
startLbComm(newLbAddressGroups);
Expand All @@ -252,6 +269,7 @@ void handleAddresses(
}
// Start the fallback timer if it's never started
if (fallbackTimer == null) {
fallbackReason = BALANCER_TIMEOUT_STATUS;
fallbackTimer = syncContext.schedule(
new FallbackModeTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS, timerService);
}
Expand All @@ -275,16 +293,21 @@ void requestConnection() {
}

private void maybeUseFallbackBackends() {
if (balancerWorking) {
return;
}
if (usingFallbackBackends) {
if (balancerWorking || usingFallbackBackends) {
return;
}
// Balancer RPC should have either been broken or timed out.
checkState(fallbackReason != null, "no reason to fallback");
for (Subchannel subchannel : subchannels.values()) {
if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) {
ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
return;
}
// If we do have balancer-provided backends, use one of its error in the error message if
// fail to fallback.
if (stateInfo.getState() == TRANSIENT_FAILURE) {
fallbackReason = stateInfo.getStatus();
}
}
// Fallback conditions met
useFallbackBackends();
Expand Down Expand Up @@ -658,7 +681,9 @@ private void handleResponse(LoadBalanceResponse response) {
}

if (typeCase == LoadBalanceResponseTypeCase.FALLBACK_RESPONSE) {
// Force entering fallback requested by balancer.
cancelFallbackTimer();
fallbackReason = BALANCER_REQUESTED_FALLBACK_STATUS;
useFallbackBackends();
maybeUpdatePicker();
return;
Expand Down Expand Up @@ -701,8 +726,9 @@ private void handleResponse(LoadBalanceResponse response) {
newBackendAddrList.add(new BackendAddressGroup(eag, token));
}
}
// Stop using fallback backends as soon as a new server list is received from the balancer.
// Exit fallback as soon as a new server list is received from the balancer.
usingFallbackBackends = false;
fallbackReason = null;
cancelFallbackTimer();
updateServerList(newDropList, newBackendAddrList, loadRecorder);
maybeUpdatePicker();
Expand All @@ -717,6 +743,7 @@ private void handleStreamClosed(Status error) {
cleanUp();
propagateError(error);
balancerWorking = false;
fallbackReason = error;
maybeUseFallbackBackends();
maybeUpdatePicker();

Expand Down Expand Up @@ -773,15 +800,16 @@ private void maybeUpdatePicker() {
List<RoundRobinEntry> pickList;
ConnectivityState state;
if (backendList.isEmpty()) {
if (balancerWorking) {
pickList =
Collections.<RoundRobinEntry>singletonList(
new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS));
// Note balancer (is working) may enforce using fallback backends, and that fallback may
// fail. So we should check if currently in fallback first.
if (usingFallbackBackends) {
pickList = Collections.<RoundRobinEntry>singletonList(new ErrorEntry(
fallbackReason.augmentDescription(NO_FALLBACK_BACKENDS_ERROR)));
state = TRANSIENT_FAILURE;
} else if (usingFallbackBackends) {
} else if (balancerWorking) {
pickList =
Collections.<RoundRobinEntry>singletonList(
new ErrorEntry(NO_FALLBACK_BACKENDS_FOUND_STATUS));
new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS));
state = TRANSIENT_FAILURE;
} else { // still waiting for balancer
pickList = Collections.singletonList(BUFFER_ENTRY);
Expand Down Expand Up @@ -1119,4 +1147,11 @@ public String toString() {
.toString();
}
}

private enum BalancerState {
// Using backends provided by remote balancer.
BALANCER,
// Using backends in the fallback backend list.
FALLBACK
}
}
141 changes: 136 additions & 5 deletions grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
Expand Up @@ -1284,11 +1284,16 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
for (Subchannel subchannel : mockSubchannels) {
verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
}

// RPC error status includes message of balancer RPC timeout
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).isEmpty();
assertThat(picker.pickList)
.containsExactly(new ErrorEntry(GrpclbState.NO_FALLBACK_BACKENDS_FOUND_STATUS));
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode())
.isEqualTo(GrpclbState.BALANCER_TIMEOUT_STATUS.getCode());
assertThat(result.getStatus().getDescription())
.startsWith(GrpclbState.BALANCER_TIMEOUT_STATUS.getDescription());
assertThat(result.getStatus().getDescription())
.contains(GrpclbState.NO_FALLBACK_BACKENDS_ERROR);
}

////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1408,6 +1413,24 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() {
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));

//////////////////////////////////////////////////////////////////////
// Name resolver sends new resolution results without any backend addr
//////////////////////////////////////////////////////////////////////
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);

// Still in fallback logic, except that the backend list is empty
for (Subchannel subchannel : mockSubchannels) {
verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
}

// RPC error status includes error of balancer stream
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(streamError.getCode());
assertThat(result.getStatus().getDescription()).startsWith(streamError.getDescription());
assertThat(result.getStatus().getDescription())
.contains(GrpclbState.NO_FALLBACK_BACKENDS_ERROR);
}

@Test
Expand All @@ -1434,6 +1457,24 @@ public void grpclbFallback_noBalancerAddress() {
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
verify(helper, never())
.createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>anyList(), anyString());
logs.clear();

///////////////////////////////////////////////////////////////////////////////////////
// Name resolver sends new resolution results without any backend addr or balancer addr
///////////////////////////////////////////////////////////////////////////////////////
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(),
Collections.<EquivalentAddressGroup>emptyList());
assertThat(logs).containsExactly(
"DEBUG: [grpclb-<api.google.com>] Error: Status{code=UNAVAILABLE, "
+ "description=No backend or balancer addresses found, cause=null}");

// Keep using existing fallback addresses without interruption
for (Subchannel subchannel : mockSubchannels) {
verify(subchannelPool, never())
.returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
}
verify(helper, never())
.updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
}

@Test
Expand Down Expand Up @@ -1531,6 +1572,7 @@ private void subtestGrpclbFallbackConnectionLost(
}
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));

// No subchannel to fallback backends should have been created if no fallback happened
if (!(balancerBroken && allSubchannelsBroken)) {
verify(subchannelPool, never()).takeOrCreateSubchannel(
eq(backendList.get(0)), any(Attributes.class));
Expand All @@ -1539,6 +1581,74 @@ private void subtestGrpclbFallbackConnectionLost(
}
}

@Test
public void grpclbFallback_allLost_failToFallback() {
long loadReportIntervalMillis = 1983;
InOrder inOrder = inOrder(helper, mockLbService, subchannelPool);

// Create balancer and (empty) backend addresses
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);

inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));

// Attempted to connect to balancer
assertEquals(1, fakeOobChannels.size());
fakeOobChannels.poll();
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();

verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
// We don't care if these methods have been run.
inOrder.verify(helper, atLeast(0)).getSynchronizationContext();
inOrder.verify(helper, atLeast(0)).getScheduledExecutorService();

inOrder.verifyNoMoreInteractions();

// Balancer returns a server list
List<ServerEntry> serverList = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(serverList));

List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList);

// Break connections
lbResponseObserver.onError(Status.UNAVAILABLE.asException());
// A new stream to LB is created
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();

// Break all subchannel connections
Status error = Status.UNAUTHENTICATED.withDescription("Permission denied");
for (Subchannel subchannel : subchannels) {
deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error));
}

// Recycle all subchannels
for (Subchannel subchannel : subchannels) {
verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
}

// RPC error status includes errors of subchannels to balancer-provided backends
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(error.getCode());
assertThat(result.getStatus().getDescription()).startsWith(error.getDescription());
assertThat(result.getStatus().getDescription())
.contains(GrpclbState.NO_FALLBACK_BACKENDS_ERROR);
}

private List<Subchannel> fallbackTestVerifyUseOfFallbackBackendLists(
InOrder inOrder, List<EquivalentAddressGroup> addrs) {
return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null);
Expand Down Expand Up @@ -1958,6 +2068,7 @@ public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception {
assertThat(mockSubchannels).isEmpty();
verify(subchannel).shutdown();

// RPC error status includes message of no backends provided by balancer
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(errorPicker.pickList)
Expand Down Expand Up @@ -2445,7 +2556,7 @@ public void grpclbWorking_lbSendsFallbackMessage() {
new BackendEntry(subchannel2, getLoadRecorder(), "token0002"))
.inOrder();

// enter fallback mode
// Balancer forces entering fallback mode
lbResponseObserver.onNext(buildLbFallbackResponse());

// existing subchannels must be returned immediately to gracefully shutdown.
Expand All @@ -2460,6 +2571,26 @@ public void grpclbWorking_lbSendsFallbackMessage() {
assertFalse(oobChannel.isShutdown());
verify(lbRequestObserver, never()).onCompleted();

//////////////////////////////////////////////////////////////////////
// Name resolver sends new resolution results without any backend addr
//////////////////////////////////////////////////////////////////////
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);

// Still in fallback logic, except that the backend list is empty
for (Subchannel subchannel : mockSubchannels) {
verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
}

// RPC error status includes message of fallback requested by balancer
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode())
.isEqualTo(GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS.getCode());
assertThat(result.getStatus().getDescription())
.startsWith(GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS.getDescription());
assertThat(result.getStatus().getDescription())
.contains(GrpclbState.NO_FALLBACK_BACKENDS_ERROR);

// exit fall back by providing two new backends
ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001");
ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002");
Expand Down

0 comments on commit 51142fc

Please sign in to comment.