Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpclb: recover after receiving an empty server list #4879

Merged
merged 3 commits into from Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 10 additions & 2 deletions balancer/grpclb/grpclb_remote_balancer.go
Expand Up @@ -135,11 +135,19 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
}

if lb.usePickFirst {
var sc balancer.SubConn
for _, sc = range lb.subConns {
var (
scKey resolver.Address
sc balancer.SubConn
)
for scKey, sc = range lb.subConns {
break
}
if sc != nil {
if len(backendAddrs) == 0 {
lb.cc.cc.RemoveSubConn(sc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to handle this UpdateAddresses() so that every balancer need not handle this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's what #4881 is tracking.

delete(lb.subConns, scKey)
return
}
lb.cc.cc.UpdateAddresses(sc, backendAddrs)
sc.Connect()
return
Expand Down
72 changes: 72 additions & 0 deletions balancer/grpclb/grpclb_test.go
Expand Up @@ -1274,6 +1274,78 @@ func (s) TestGRPCLBBackendConnectionErrorPropagation(t *testing.T) {
wg.Wait()
}

func testGRPCLBEmptyServerList(t *testing.T, svcfg string) {
r := manual.NewBuilderWithScheme("whatever")

tss, cleanup, err := startBackendsAndRemoteLoadBalancer(1, "", nil)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()

beServers := []*lbpb.Server{{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
}}

creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: Could we not split these lines in a random way. In a recent PR, I cleaned up this test to instead do

	cc, err := grpc.Dial(r.Scheme()+":///"+beServerName,
		grpc.WithResolvers(r),
		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
		grpc.WithContextDialer(fakeNameDialer))
	if err != nil {
		t.Fatalf("Failed to dial to the backend %v", err)
	}

which is a tad more readable I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)

tss.ls.sls <- &lbpb.ServerList{Servers: beServers}

scpr := r.CC.ParseServiceConfig(svcfg)
if scpr.Err != nil {
t.Fatalf("Error parsing config %q: %v", svcfg, scpr.Err)
}

r.UpdateState(resolver.State{
Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}},
ServiceConfig: scpr,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: similarly I cleaned up calls to UpdateState to include balancer addresses through an attribute instead of the deprecated Type field.

	rs = grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)},
		&grpclbstate.State{BalancerAddresses: []resolver.Address{{
			Addr:       tss.lbAddr,
			ServerName: lbServerName,
		}}})
	r.UpdateState(rs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

t.Log("Perform an initial RPC and expect it to succeed...")
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("Initial _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
t.Log("Now send an empty server list. Wait until we see an RPC failure to make sure the client got it...")
tss.ls.sls <- &lbpb.ServerList{}
gotError := false
for i := 0; i < 100; i++ {
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil {
gotError = true
break
}
}
if !gotError {
t.Fatalf("Expected to eventually see an RPC fail after the grpclb sends an empty server list, but none did.")
}
t.Log("Now send a non-empty server list. A wait-for-ready RPC should now succeed...")
tss.ls.sls <- &lbpb.ServerList{Servers: beServers}
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("Final _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
}

func (s) TestGRPCLBEmptyServerListRoundRobin(t *testing.T) {
testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}}]}}]}`)
}

func (s) TestGRPCLBEmptyServerListPickFirst(t *testing.T) {
testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`)
}

func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

Expand Down