From 6a60fa92fcbade9b814a5f65050eb592910f4e6d Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 15 Oct 2021 10:44:48 -0700 Subject: [PATCH] [grpclb_empty_address] grpclb: recover after receiving an empty server list --- balancer/grpclb/grpclb_remote_balancer.go | 12 +++- balancer/grpclb/grpclb_test.go | 76 +++++++++++++++++++++++ 2 files changed, 86 insertions(+), 2 deletions(-) diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 0210c012d7b0..330df4baa218 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -135,11 +135,19 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback } if lb.usePickFirst { - var sc balancer.SubConn - for _, sc = range lb.subConns { + var ( + scKey resolver.Address + sc balancer.SubConn + ) + for scKey, sc = range lb.subConns { break } if sc != nil { + if len(backendAddrs) == 0 { + lb.cc.cc.RemoveSubConn(sc) + delete(lb.subConns, scKey) + return + } lb.cc.cc.UpdateAddresses(sc, backendAddrs) sc.Connect() return diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 3b666764728f..34e9f95c348c 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -1274,6 +1274,82 @@ func (s) TestGRPCLBBackendConnectionErrorPropagation(t *testing.T) { wg.Wait() } +func testGRPCLBEmptyServerList(t *testing.T, svcfg string) { + r := manual.NewBuilderWithScheme("whatever") + + tss, cleanup, err := startBackendsAndRemoteLoadBalancer(1, "", nil) + if err != nil { + t.Fatalf("failed to create new load balancer: %v", err) + } + defer cleanup() + + beServers := []*lbpb.Server{{ + IpAddress: tss.beIPs[0], + Port: int32(tss.bePorts[0]), + LoadBalanceToken: lbToken, + }} + + creds := serverNameCheckCreds{} + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r), + grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer)) + if err != nil { + t.Fatalf("Failed to dial to the backend %v", err) + } + defer cc.Close() + testC := testpb.NewTestServiceClient(cc) + + tss.ls.sls <- &lbpb.ServerList{Servers: beServers} + + scpr := r.CC.ParseServiceConfig(svcfg) + if scpr.Err != nil { + t.Fatalf("Error parsing config %q: %v", svcfg, scpr.Err) + } + + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{ + Addr: tss.lbAddr, + Type: resolver.GRPCLB, + ServerName: lbServerName, + }}, + ServiceConfig: scpr, + }) + t.Log("Perform an initial RPC and expect it to succeed...") + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("Initial _.EmptyCall(_, _) = _, %v, want _, ", err) + } + t.Log("Now send an empty server list. Wait until we see an RPC failure to make sure the cliet got it...") + tss.ls.sls <- &lbpb.ServerList{} + gotUnavailable := false + for i := 0; i < 100 && !gotUnavailable; i++ { + func() { + ctx, cancel := context.WithTimeout(ctx, time.Second*1) + defer cancel() + _, err := testC.EmptyCall(ctx, &testpb.Empty{}) + if status.Code(err) == codes.Unavailable { + gotUnavailable = true + } + }() + } + if !gotUnavailable { + t.Fatalf("Expected to eventually see an RPC fail with unavailable after the grpclb sends an empty server list, but none did.") + } + t.Log("Now send a non-empty server list. A wait-for-ready RPC should now succeed...") + tss.ls.sls <- &lbpb.ServerList{Servers: beServers} + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("Final _.EmptyCall(_, _) = _, %v, want _, ", err) + } +} + +func (s) TestGRPCLBEmptyServerListRoundRobin(t *testing.T) { + testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}}]}}]}`) +} + +func (s) TestGRPCLBEmptyServerListPickFirst(t *testing.T) { + testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`) +} + func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) { r := manual.NewBuilderWithScheme("whatever")