Skip to content

Commit

Permalink
core: Sticky TRANSIENT_FAILURE in PickFirstLoadBalancer (#10106)
Browse files Browse the repository at this point in the history
When the subchannel is transitioning from TRANSIENT_FAILURE to either
IDLE or CONNECTING we will not update the LB state. Additionally, if
the subchannel becomes idle we request a new connection so that the
subchannel will keep on trying to establish a connection.
  • Loading branch information
temawi committed Apr 27, 2023
1 parent c5b825a commit fbc8679
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 13 deletions.
37 changes: 29 additions & 8 deletions core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
final class PickFirstLoadBalancer extends LoadBalancer {
private final Helper helper;
private Subchannel subchannel;
private ConnectivityState currentState = IDLE;

PickFirstLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
Expand Down Expand Up @@ -69,7 +70,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {

// The channel state does not get updated when doing name resolving today, so for the moment
// let LB report CONNECTION and call subchannel.requestConnection() immediately.
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
subchannel.requestConnection();
} else {
subchannel.updateAddresses(servers);
Expand All @@ -86,20 +87,34 @@ public void handleNameResolutionError(Status error) {
}
// NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine
// for time being.
helper.updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
}

private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
ConnectivityState currentState = stateInfo.getState();
if (currentState == SHUTDOWN) {
ConnectivityState newState = stateInfo.getState();
if (newState == SHUTDOWN) {
return;
}
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
if (newState == TRANSIENT_FAILURE || newState == IDLE) {
helper.refreshNameResolution();
}

// If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
// transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
// transient failure". Only a subchannel state change to READY will get the LB out of
// TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we
// keep retrying for a connection.
if (currentState == TRANSIENT_FAILURE) {
if (newState == CONNECTING) {
return;
} else if (newState == IDLE) {
requestConnection();
return;
}
}

SubchannelPicker picker;
switch (currentState) {
switch (newState) {
case IDLE:
picker = new RequestConnectionPicker(subchannel);
break;
Expand All @@ -115,9 +130,15 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
picker = new Picker(PickResult.withError(stateInfo.getStatus()));
break;
default:
throw new IllegalArgumentException("Unsupported state:" + currentState);
throw new IllegalArgumentException("Unsupported state:" + newState);
}
helper.updateBalancingState(currentState, picker);

updateBalancingState(newState, picker);
}

private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
currentState = state;
helper.updateBalancingState(state, picker);
}

@Override
Expand Down
47 changes: 42 additions & 5 deletions core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,17 +260,17 @@ public void pickAfterStateChangeAfterResolution() throws Exception {
reset(mockHelper);
when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

Status error = Status.UNAVAILABLE.withDescription("boom!");
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
Expand All @@ -279,6 +279,43 @@ public void pickAfterStateChangeAfterResolution() throws Exception {
verifyNoMoreInteractions(mockHelper);
}

@Test
public void pickAfterResolutionAfterTransientValue() throws Exception {
InOrder inOrder = inOrder(mockHelper);

loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture());
CreateSubchannelArgs args = createArgsCaptor.getValue();
assertThat(args.getAddresses()).isEqualTo(servers);
verify(mockSubchannel).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection();
reset(mockHelper);
when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);

// An error has happened.
Status error = Status.UNAVAILABLE.withDescription("boom!");
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

// But a subsequent IDLE update should be ignored and the LB state not updated. Additionally,
// a request for a new connection should be made keep the subchannel trying to connect.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
verify(mockSubchannel, times(2)).requestConnection();

// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
}

@Test
public void nameResolutionError() throws Exception {
Status error = Status.NOT_FOUND.withDescription("nameResolutionError");
Expand Down

0 comments on commit fbc8679

Please sign in to comment.