Skip to content

Commit

Permalink
[grpclb_empty_address] grpclb: recover after receiving an empty serve…
Browse files Browse the repository at this point in the history
…r list
  • Loading branch information
menghanl committed Oct 15, 2021
1 parent 3db1cb0 commit 6a60fa9
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 2 deletions.
12 changes: 10 additions & 2 deletions balancer/grpclb/grpclb_remote_balancer.go
Expand Up @@ -135,11 +135,19 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
}

if lb.usePickFirst {
var sc balancer.SubConn
for _, sc = range lb.subConns {
var (
scKey resolver.Address
sc balancer.SubConn
)
for scKey, sc = range lb.subConns {
break
}
if sc != nil {
if len(backendAddrs) == 0 {
lb.cc.cc.RemoveSubConn(sc)
delete(lb.subConns, scKey)
return
}
lb.cc.cc.UpdateAddresses(sc, backendAddrs)
sc.Connect()
return
Expand Down
76 changes: 76 additions & 0 deletions balancer/grpclb/grpclb_test.go
Expand Up @@ -1274,6 +1274,82 @@ func (s) TestGRPCLBBackendConnectionErrorPropagation(t *testing.T) {
wg.Wait()
}

func testGRPCLBEmptyServerList(t *testing.T, svcfg string) {
r := manual.NewBuilderWithScheme("whatever")

tss, cleanup, err := startBackendsAndRemoteLoadBalancer(1, "", nil)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()

beServers := []*lbpb.Server{{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
}}

creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)

tss.ls.sls <- &lbpb.ServerList{Servers: beServers}

scpr := r.CC.ParseServiceConfig(svcfg)
if scpr.Err != nil {
t.Fatalf("Error parsing config %q: %v", svcfg, scpr.Err)
}

r.UpdateState(resolver.State{
Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}},
ServiceConfig: scpr,
})
t.Log("Perform an initial RPC and expect it to succeed...")
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("Initial _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
t.Log("Now send an empty server list. Wait until we see an RPC failure to make sure the cliet got it...")
tss.ls.sls <- &lbpb.ServerList{}
gotUnavailable := false
for i := 0; i < 100 && !gotUnavailable; i++ {
func() {
ctx, cancel := context.WithTimeout(ctx, time.Second*1)
defer cancel()
_, err := testC.EmptyCall(ctx, &testpb.Empty{})
if status.Code(err) == codes.Unavailable {
gotUnavailable = true
}
}()
}
if !gotUnavailable {
t.Fatalf("Expected to eventually see an RPC fail with unavailable after the grpclb sends an empty server list, but none did.")
}
t.Log("Now send a non-empty server list. A wait-for-ready RPC should now succeed...")
tss.ls.sls <- &lbpb.ServerList{Servers: beServers}
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("Final _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
}

func (s) TestGRPCLBEmptyServerListRoundRobin(t *testing.T) {
testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}}]}}]}`)
}

func (s) TestGRPCLBEmptyServerListPickFirst(t *testing.T) {
testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`)
}

func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

Expand Down

0 comments on commit 6a60fa9

Please sign in to comment.