diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 04431fbd620..4a39ce9d40f 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -1059,6 +1059,26 @@ public void refreshNameResolution() { throw new UnsupportedOperationException(); } + /** + * Historically the channel automatically refreshes name resolution if any subchannel + * connection is broken. It's transitioning to let load balancers make the decision. To + * avoid silent breakages, the channel checks if {@link #refreshNameResolution} is called + * by the load balancer. If not, it will do it and log a warning. This will be removed in + * the future and load balancers are completely responsible for triggering the refresh. + * See #8088 for the background. + * + *

This should rarely be used, but sometimes the address for the subchannel wasn't + * provided by the name resolver and a refresh needs to be directed somewhere else instead. + * Then you can call this method to disable the short-tem check for detecting LoadBalancers + * that need to be updated for the new expected behavior. + * + * @since 1.38.0 + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8088") + public void ignoreRefreshNameResolutionCheck() { + // no-op + } + /** * Returns a {@link SynchronizationContext} that runs tasks in the same Synchronization Context * as that the callback methods on the {@link LoadBalancer} interface are run in. diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 4b3d5109b35..1bd6b3f5ab6 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -117,6 +117,7 @@ @ThreadSafe final class ManagedChannelImpl extends ManagedChannel implements InternalInstrumented { + @VisibleForTesting static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName()); // Matching this pattern means the target string is a URI target or at least intended to be one. @@ -1440,6 +1441,8 @@ void remove(RetriableStream retriableStream) { private final class LbHelperImpl extends LoadBalancer.Helper { AutoConfiguredLoadBalancer lb; + boolean nsRefreshedByLb; + boolean ignoreRefreshNsCheck; @Override public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) { @@ -1478,6 +1481,7 @@ public void run() { @Override public void refreshNameResolution() { syncContext.throwIfNotInThisSynchronizationContext(); + nsRefreshedByLb = true; final class LoadBalancerRefreshNameResolution implements Runnable { @Override public void run() { @@ -1488,6 +1492,11 @@ public void run() { syncContext.execute(new LoadBalancerRefreshNameResolution()); } + @Override + public void ignoreRefreshNameResolutionCheck() { + ignoreRefreshNsCheck = true; + } + @Override public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) { return createOobChannel(Collections.singletonList(addressGroup), authority); @@ -1530,6 +1539,8 @@ void onTerminated(InternalSubchannel is) { @Override void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { + // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's + // state and refresh name resolution if necessary. handleInternalSubchannelState(newState); oobChannel.handleSubchannelStateChange(newState); } @@ -1951,9 +1962,18 @@ void onTerminated(InternalSubchannel is) { @Override void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { - handleInternalSubchannelState(newState); checkState(listener != null, "listener is null"); listener.onSubchannelState(newState); + if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { + if (!helper.ignoreRefreshNsCheck && !helper.nsRefreshedByLb) { + logger.log(Level.WARNING, + "LoadBalancer should call Helper.refreshNameResolution() to refresh name " + + "resolution if subchannel state becomes TRANSIENT_FAILURE or IDLE. " + + "This will no longer happen automatically in the future releases"); + refreshAndResetNameResolution(); + helper.nsRefreshedByLb = true; + } + } } @Override diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index 535b34b015d..d5f74db54a7 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; @@ -84,6 +85,9 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo if (currentState == SHUTDOWN) { return; } + if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) { + helper.refreshNameResolution(); + } SubchannelPicker picker; switch (currentState) { diff --git a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java index 42299571f84..05db207806d 100644 --- a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java +++ b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java @@ -94,6 +94,11 @@ public void refreshNameResolution() { delegate().refreshNameResolution(); } + @Override + public void ignoreRefreshNameResolutionCheck() { + delegate().ignoreRefreshNameResolutionCheck(); + } + @Override public String getAuthority() { return delegate().getAuthority(); diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index 179755cbf8e..846ed90aecc 100644 --- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -139,6 +139,9 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) { return; } + if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) { + helper.refreshNameResolution(); + } if (stateInfo.getState() == IDLE) { subchannel.requestConnection(); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 7f896892d0d..c0868baeecb 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -139,6 +139,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; import javax.annotation.Nullable; import org.junit.After; import org.junit.Assert; @@ -277,6 +280,7 @@ public String getPolicyName() { private boolean requestConnection = true; private BlockingQueue transports; private boolean panicExpected; + private final List logs = new ArrayList<>(); @Captor private ArgumentCaptor resolvedAddressCaptor; @@ -328,6 +332,22 @@ public void setUp() throws Exception { when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); when(balancerRpcExecutorPool.getObject()) .thenReturn(balancerRpcExecutor.getScheduledExecutorService()); + Handler handler = new Handler() { + @Override + public void publish(LogRecord record) { + logs.add(record); + } + + @Override + public void flush() { + } + + @Override + public void close() throws SecurityException { + } + }; + ManagedChannelImpl.logger.addHandler(handler); + ManagedChannelImpl.logger.setLevel(Level.ALL); channelBuilder = new ManagedChannelImplBuilder(TARGET, new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT)); @@ -1539,6 +1559,103 @@ public void run() { timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); } + @Test + public void subchannelConnectionBroken_noLbRefreshingResolver_logWarningAndTriggeRefresh() { + FakeNameResolverFactory nameResolverFactory = + new FakeNameResolverFactory.Builder(expectedUri) + .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) + .build(); + channelBuilder.nameResolverFactory(nameResolverFactory); + createChannel(); + FakeNameResolverFactory.FakeNameResolver resolver = + Iterables.getOnlyElement(nameResolverFactory.resolvers); + assertThat(resolver.refreshCalled).isEqualTo(0); + + Subchannel subchannel = + createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); + InternalSubchannel internalSubchannel = + (InternalSubchannel) subchannel.getInternalSubchannel(); + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo transportInfo = transports.poll(); + + // Break subchannel connection + transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable")); + LogRecord log = Iterables.getOnlyElement(logs); + assertThat(log.getLevel()).isEqualTo(Level.WARNING); + assertThat(log.getMessage()).isEqualTo( + "LoadBalancer should call Helper.refreshNameResolution() to refresh name resolution if " + + "subchannel state becomes TRANSIENT_FAILURE or IDLE. This will no longer happen " + + "automatically in the future releases"); + assertThat(resolver.refreshCalled).isEqualTo(1); + } + + @Test + public void subchannelConnectionBroken_ResolverRefreshedByLb() { + FakeNameResolverFactory nameResolverFactory = + new FakeNameResolverFactory.Builder(expectedUri) + .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) + .build(); + channelBuilder.nameResolverFactory(nameResolverFactory); + createChannel(); + FakeNameResolverFactory.FakeNameResolver resolver = + Iterables.getOnlyElement(nameResolverFactory.resolvers); + assertThat(resolver.refreshCalled).isEqualTo(0); + ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(Helper.class); + verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); + helper = helperCaptor.getValue(); + + SubchannelStateListener listener = new SubchannelStateListener() { + @Override + public void onSubchannelState(ConnectivityStateInfo newState) { + // Normal LoadBalancer should refresh name resolution when some subchannel enters + // TRANSIENT_FAILURE or IDLE + if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { + helper.refreshNameResolution(); + } + } + }; + Subchannel subchannel = + createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, listener); + InternalSubchannel internalSubchannel = + (InternalSubchannel) subchannel.getInternalSubchannel(); + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo transportInfo = transports.poll(); + + // Break subchannel connection and simulate load balancer refreshing name resolution + transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable")); + assertThat(logs).isEmpty(); + assertThat(resolver.refreshCalled).isEqualTo(1); + } + + @Test + public void subchannelConnectionBroken_ignoreRefreshNameResolutionCheck_noRefresh() { + FakeNameResolverFactory nameResolverFactory = + new FakeNameResolverFactory.Builder(expectedUri) + .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) + .build(); + channelBuilder.nameResolverFactory(nameResolverFactory); + createChannel(); + FakeNameResolverFactory.FakeNameResolver resolver = + Iterables.getOnlyElement(nameResolverFactory.resolvers); + assertThat(resolver.refreshCalled).isEqualTo(0); + ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(Helper.class); + verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); + helper = helperCaptor.getValue(); + helper.ignoreRefreshNameResolutionCheck(); + + Subchannel subchannel = + createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); + InternalSubchannel internalSubchannel = + (InternalSubchannel) subchannel.getInternalSubchannel(); + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo transportInfo = transports.poll(); + + // Break subchannel connection + transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable")); + assertThat(logs).isEmpty(); + assertThat(resolver.refreshCalled).isEqualTo(0); + } + @Test public void subchannelStringableBeforeStart() { createChannel(); @@ -2095,43 +2212,26 @@ public void lbHelper_getNameResolverRegistry() { .isSameInstanceAs(NameResolverRegistry.getDefaultRegistry()); } - @Test - public void refreshNameResolution_whenSubchannelConnectionFailed_notIdle() { - subtestNameResolutionRefreshWhenConnectionFailed(false, false); - } - @Test public void refreshNameResolution_whenOobChannelConnectionFailed_notIdle() { - subtestNameResolutionRefreshWhenConnectionFailed(true, false); - } - - @Test - public void notRefreshNameResolution_whenSubchannelConnectionFailed_idle() { - subtestNameResolutionRefreshWhenConnectionFailed(false, true); + subtestNameResolutionRefreshWhenConnectionFailed(false); } @Test public void notRefreshNameResolution_whenOobChannelConnectionFailed_idle() { - subtestNameResolutionRefreshWhenConnectionFailed(true, true); + subtestNameResolutionRefreshWhenConnectionFailed(true); } - private void subtestNameResolutionRefreshWhenConnectionFailed( - boolean isOobChannel, boolean isIdle) { + private void subtestNameResolutionRefreshWhenConnectionFailed(boolean isIdle) { FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory.Builder(expectedUri) .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) .build(); channelBuilder.nameResolverFactory(nameResolverFactory); createChannel(); - if (isOobChannel) { - OobChannel oobChannel = (OobChannel) helper.createOobChannel( - Collections.singletonList(addressGroup), "oobAuthority"); - oobChannel.getSubchannel().requestConnection(); - } else { - Subchannel subchannel = - createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); - requestConnectionSafely(helper, subchannel); - } + OobChannel oobChannel = (OobChannel) helper.createOobChannel( + Collections.singletonList(addressGroup), "oobAuthority"); + oobChannel.getSubchannel().requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); assertNotNull(transportInfo); diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 1c3e0cb1a1b..720341da0cb 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -22,6 +22,8 @@ import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; @@ -160,6 +162,38 @@ public void requestConnectionPicker() throws Exception { verify(mockSubchannel, times(2)).requestConnection(); } + @Test + public void refreshNameResolutionAfterSubchannelConnectionBroken() { + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + verify(mockHelper).createSubchannel(any(CreateSubchannelArgs.class)); + + InOrder inOrder = inOrder(mockHelper, mockSubchannel); + inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + inOrder.verify(mockSubchannel).requestConnection(); + + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + Status error = Status.UNAUTHENTICATED.withDescription("permission denied"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + // Simulate receiving go-away so the subchannel transit to IDLE. + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + verifyNoMoreInteractions(mockHelper, mockSubchannel); + } + @Test public void pickAfterResolvedAndUnchanged() throws Exception { loadBalancer.handleResolvedAddresses( @@ -225,10 +259,12 @@ public void pickAfterStateChangeAfterResolution() throws Exception { Status error = Status.UNAVAILABLE.withDescription("boom!"); stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).refreshNameResolution(); inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); @@ -294,6 +330,7 @@ public void nameResolutionErrorWithStateChanges() throws Exception { SubchannelStateListener stateListener = stateListenerCaptor.getValue(); stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); + inOrder.verify(mockHelper).refreshNameResolution(); inOrder.verify(mockHelper).updateBalancingState( eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index e9f4ff9b439..5def703e7b4 100644 --- a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -277,11 +277,13 @@ public void pickAfterStateChange() throws Exception { ConnectivityStateInfo.forTransientFailure(error)); assertThat(subchannelStateInfo.value.getState()).isEqualTo(TRANSIENT_FAILURE); assertThat(subchannelStateInfo.value.getStatus()).isEqualTo(error); + inOrder.verify(mockHelper).refreshNameResolution(); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class); deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).refreshNameResolution(); assertThat(subchannelStateInfo.value.getState()).isEqualTo(TRANSIENT_FAILURE); assertThat(subchannelStateInfo.value.getStatus()).isEqualTo(error); @@ -305,9 +307,7 @@ public void stayTransientFailureUntilReady() { deliverSubchannelState( sc, ConnectivityStateInfo.forTransientFailure(error)); - deliverSubchannelState( - sc, - ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).refreshNameResolution(); deliverSubchannelState( sc, ConnectivityStateInfo.forNonError(CONNECTING)); @@ -330,6 +330,35 @@ public void stayTransientFailureUntilReady() { verifyNoMoreInteractions(mockHelper); } + @Test + public void refreshNameResolutionWhenSubchannelConnectionBroken() { + InOrder inOrder = inOrder(mockHelper); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY) + .build()); + + verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + + // Simulate state transitions for each subchannel individually. + for (Subchannel sc : loadBalancer.getSubchannels()) { + verify(sc).requestConnection(); + deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(CONNECTING)); + Status error = Status.UNKNOWN.withDescription("connection broken"); + deliverSubchannelState(sc, ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); + deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), isA(ReadyPicker.class)); + // Simulate receiving go-away so READY subchannels transit to IDLE. + deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).refreshNameResolution(); + verify(sc, times(2)).requestConnection(); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + } + + verifyNoMoreInteractions(mockHelper); + } + @Test public void pickerRoundRobin() throws Exception { Subchannel subchannel = mock(Subchannel.class); diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index c8bf77076c3..8c638b979ed 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -222,6 +222,10 @@ void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState if (config.getMode() == Mode.ROUND_ROBIN && newState.getState() == IDLE) { subchannel.requestConnection(); } + if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { + helper.refreshNameResolution(); + } + AtomicReference stateInfoRef = subchannel.getAttributes().get(STATE_INFO); // If all RR servers are unhealthy, it's possible that at least one connection is CONNECTING at diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 4fc3d122347..0c194ae84c9 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1003,6 +1003,7 @@ public void grpclbWorking() { verify(subchannel1).requestConnection(); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE)); verify(subchannel1, times(2)).requestConnection(); + inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); assertThat(logs).containsExactly( "INFO: [grpclb-] Update balancing state to READY: picks=" @@ -1022,6 +1023,7 @@ public void grpclbWorking() { ConnectivityStateInfo errorState1 = ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("error1")); deliverSubchannelState(subchannel1, errorState1); + inOrder.verify(helper).refreshNameResolution(); inOrder.verifyNoMoreInteractions(); // If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING @@ -1183,7 +1185,9 @@ public void roundRobinMode_subchannelStayTransientFailureUntilReady() { // Switch all subchannels to TRANSIENT_FAILURE, making the general state TRANSIENT_FAILURE too. Status error = Status.UNAVAILABLE.withDescription("error"); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(helper).refreshNameResolution(); deliverSubchannelState(subchannel2, ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); assertThat(((RoundRobinPicker) pickerCaptor.getValue()).pickList) .containsExactly(new ErrorEntry(error)); @@ -1191,6 +1195,7 @@ public void roundRobinMode_subchannelStayTransientFailureUntilReady() { // Switch subchannel1 to IDLE, then to CONNECTING, which are ignored since the previous // subchannel state is TRANSIENT_FAILURE. General state is unchanged. deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(helper).refreshNameResolution(); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verifyNoMoreInteractions(); @@ -1549,11 +1554,13 @@ private void subtestGrpclbFallbackConnectionLost( lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); + inOrder.verify(helper).refreshNameResolution(); } if (allSubchannelsBroken) { for (Subchannel subchannel : subchannels) { // A READY subchannel transits to IDLE when receiving a go-away deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(helper).refreshNameResolution(); } } @@ -1566,6 +1573,7 @@ private void subtestGrpclbFallbackConnectionLost( // connections are lost for (Subchannel subchannel : subchannels) { deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(helper).refreshNameResolution(); } // Exit fallback mode or cancel fallback timer when receiving a new server list from balancer diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index f3a655289d7..3b7e84bd543 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -211,6 +211,7 @@ public void lb_working_withDefaultTarget() throws Exception { // search subchannel is down, rescue subchannel is connecting searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND)); + inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); @@ -474,6 +475,11 @@ public void updateBalancingState( // no-op } + @Override + public void refreshNameResolution() { + // no-op + } + @Override public String getAuthority() { return "fake-bigtable.googleapis.com"; diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 4ee0e773a2d..37dc4e741a8 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -279,8 +279,10 @@ private void handleEndpointResolutionError() { private final class RefreshableHelper extends ForwardingLoadBalancerHelper { private final Helper delegate; + @SuppressWarnings("deprecation") private RefreshableHelper(Helper delegate) { this.delegate = checkNotNull(delegate, "delegate"); + delegate.ignoreRefreshNameResolutionCheck(); } @Override diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index bdbaa238e21..6ca61f1bdf4 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -203,6 +203,9 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) { return; } + if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) { + helper.refreshNameResolution(); + } Ref subchannelStateRef = getSubchannelStateInfoRef(subchannel); // Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected. diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 3c2c226cd4f..264f6232c0c 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -460,6 +460,7 @@ public void onlyLogicalDnsCluster_handleRefreshNameResolution() { FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2)); assertThat(resolver.refreshCount).isEqualTo(0); + verify(helper).ignoreRefreshNameResolutionCheck(); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); childBalancer.helper.refreshNameResolution(); assertThat(resolver.refreshCount).isEqualTo(1); @@ -519,6 +520,7 @@ public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertAddressesEqual(Collections.singletonList(endpoint), childBalancer.addresses); assertThat(resolver.refreshCount).isEqualTo(0); + verify(helper).ignoreRefreshNameResolutionCheck(); childBalancer.helper.refreshNameResolution(); assertThat(resolver.refreshCount).isEqualTo(1); diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 6b70e5974df..5243ff8401b 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -233,12 +233,14 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() { subchannels.get(Collections.singletonList(servers.get(0))), ConnectivityStateInfo.forTransientFailure( Status.UNKNOWN.withDescription("unknown failure"))); + inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); // one in TRANSIENT_FAILURE, one in IDLE deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(1))), ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); verifyNoMoreInteractions(helper); @@ -260,6 +262,7 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { subchannels.get(Collections.singletonList(servers.get(0))), ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("not found"))); + inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); // two in TRANSIENT_FAILURE, two in IDLE @@ -267,6 +270,7 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { subchannels.get(Collections.singletonList(servers.get(1))), ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("also not found"))); + inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); @@ -283,6 +287,7 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { subchannels.get(Collections.singletonList(servers.get(3))), ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("connection lost"))); + inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); @@ -311,6 +316,7 @@ public void subchannelStayInTransientFailureUntilBecomeReady() { deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure( Status.UNAUTHENTICATED.withDescription("Permission denied"))); } + verify(helper, times(3)).refreshNameResolution(); // Stays in IDLE when until there are two or more subchannels in TRANSIENT_FAILURE. verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));