From 5bf44136bb2d5a5e960caf5acd5574a001f2c951 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 2 Oct 2019 10:04:32 -0700 Subject: [PATCH] xds: make balancer group restartable (#2999) This is a preparing change to support priority failover. It adds start() and close() to balancer group, so we can have a balancer group that's not in use, but has all the data and is ready to be started (think about a lower priority when the higher priority is in use). A balancer group is split into two parts: static and dynamic: static: the data from EDS, and gets updated even if balancer group is closed balancer IDs and builders, addresses for each balancer dynamic: the sub-balancers These are only created when the balancer group is started. They are closed when the balancer group is closed. And only when the balancer group is started, the sub-balancers will get address updates. --- .../balancer/edsbalancer/balancergroup.go | 351 +++++++++++++----- .../edsbalancer/balancergroup_test.go | 118 ++++++ .../balancer/edsbalancer/edsbalancer.go | 1 + .../balancer/edsbalancer/test_util_test.go | 14 +- 4 files changed, 388 insertions(+), 96 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/balancergroup.go b/xds/internal/balancer/edsbalancer/balancergroup.go index 5b1392eca51..e46563aebf1 100644 --- a/xds/internal/balancer/edsbalancer/balancergroup.go +++ b/xds/internal/balancer/edsbalancer/balancergroup.go @@ -18,6 +18,7 @@ package edsbalancer import ( "context" + "fmt" "sync" "google.golang.org/grpc/balancer" @@ -31,6 +32,76 @@ import ( orcapb "google.golang.org/grpc/xds/internal/proto/udpa/data/orca/v1/orca_load_report" ) +// subBalancerWithConfig is used to keep the configurations that will be used to start +// the underlying balancer. It can be called to start/stop the underlying +// balancer. +// +// When the config changes, it will pass the update to the underlying balancer +// if it exists. +type subBalancerWithConfig struct { + // The static part of sub-balancer. Keeps balancerBuilders and addresses. + // To be used when restarting sub-balancer. + builder balancer.Builder + addrs []resolver.Address + // The dynamic part of sub-balancer. Only used when balancer group is + // started. Gets cleared when sub-balancer is closed. + balancer balancer.Balancer +} + +func (sbc *subBalancerWithConfig) startBalancer(bgcc *balancerGroupCC) { + b := sbc.builder.Build(bgcc, balancer.BuildOptions{}) + sbc.balancer = b + if ub, ok := b.(balancer.V2Balancer); ok { + ub.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: sbc.addrs}}) + } else { + b.HandleResolvedAddrs(sbc.addrs, nil) + } +} + +func (sbc *subBalancerWithConfig) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + b := sbc.balancer + if b == nil { + // This sub-balancer was closed. This can happen when EDS removes a + // locality. The balancer for this locality was already closed, and the + // SubConns are being deleted. But SubConn state change can still + // happen. + return + } + if ub, ok := b.(balancer.V2Balancer); ok { + ub.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: state}) + } else { + b.HandleSubConnStateChange(sc, state) + } +} + +func (sbc *subBalancerWithConfig) updateAddrs(addrs []resolver.Address) { + sbc.addrs = addrs + b := sbc.balancer + if b == nil { + // This sub-balancer was closed. This should never happen because + // sub-balancers are closed when the locality is removed from EDS, or + // the balancer group is closed. There should be no further address + // updates when either of this happened. + // + // TODO: Update comment and delete the warning below. + // This will be a common case with priority support, because a + // sub-balancer (and the whole balancer group) could be closed because + // it's the lower priority, but it can still get address updates. + grpclog.Warningf("subBalancerWithConfig: updateAddrs is called when balancer is nil. This means this sub-balancer is closed.") + return + } + if ub, ok := b.(balancer.V2Balancer); ok { + ub.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}}) + } else { + b.HandleResolvedAddrs(addrs, nil) + } +} + +func (sbc *subBalancerWithConfig) stopBalancer() { + sbc.balancer.Close() + sbc.balancer = nil +} + type pickerState struct { weight uint32 picker balancer.Picker @@ -56,29 +127,87 @@ type pickerState struct { // - sub-pickers are grouped into a group-picker // - aggregated connectivity state is the overall state of all pickers. // - resolveNow +// +// Sub-balancers are only built when the balancer group is started. If the +// balancer group is closed, the sub-balancers are also closed. And it's +// guaranteed that no updates will be sent to parent ClientConn from a closed +// balancer group. type balancerGroup struct { - cc balancer.ClientConn - - mu sync.Mutex - idToBalancer map[internal.Locality]balancer.Balancer - scToID map[balancer.SubConn]internal.Locality - loadStore lrs.Store + cc balancer.ClientConn + loadStore lrs.Store - pickerMu sync.Mutex - // All balancer IDs exist as keys in this map. If an ID is not in map, it's - // either removed or never added. + // outgoingMu guards all operations in the direction: + // ClientConn-->Sub-balancer. Including start, stop, resolver updates and + // SubConn state changes. + // + // The corresponding boolean outgoingStarted is used to stop further updates + // to sub-balancers after they are closed. + outgoingMu sync.Mutex + outgoingStarted bool + idToBalancerConfig map[internal.Locality]*subBalancerWithConfig + + // incomingMu and pickerMu are to make sure this balancer group doesn't send + // updates to cc after it's closed. + // + // We don't share the mutex to avoid deadlocks (e.g. a call to sub-balancer + // may call back to balancer group inline. It causes deaclock if they + // require the same mutex). + // + // We should never need to hold multiple locks at the same time in this + // struct. The case where two locks are held can only happen when the + // underlying balancer calls back into balancer group inline. So there's an + // implicit lock acquisition order that outgoingMu is locked before either + // incomingMu or pickerMu. + + // incomingMu guards all operations in the direction: + // Sub-balancer-->ClientConn. Including NewSubConn, RemoveSubConn, and + // updatePicker. It also guards the map from SubConn to balancer ID, so + // handleSubConnStateChange needs to hold it shortly to find the + // sub-balancer to forward the update. + // + // The corresponding boolean incomingStarted is used to stop further updates + // from sub-balancers after they are closed. + incomingMu sync.Mutex + incomingStarted bool // This boolean only guards calls back to ClientConn. + scToID map[balancer.SubConn]internal.Locality + // All balancer IDs exist as keys in this map, even if balancer group is not + // started. + // + // If an ID is not in map, it's either removed or never added. idToPickerState map[internal.Locality]*pickerState } func newBalancerGroup(cc balancer.ClientConn, loadStore lrs.Store) *balancerGroup { return &balancerGroup{ - cc: cc, + cc: cc, + loadStore: loadStore, + + idToBalancerConfig: make(map[internal.Locality]*subBalancerWithConfig), + scToID: make(map[balancer.SubConn]internal.Locality), + idToPickerState: make(map[internal.Locality]*pickerState), + } +} + +func (bg *balancerGroup) start() { + bg.incomingMu.Lock() + bg.incomingStarted = true + bg.incomingMu.Unlock() - scToID: make(map[balancer.SubConn]internal.Locality), - idToBalancer: make(map[internal.Locality]balancer.Balancer), - idToPickerState: make(map[internal.Locality]*pickerState), - loadStore: loadStore, + bg.outgoingMu.Lock() + if bg.outgoingStarted { + bg.outgoingMu.Unlock() + return + } + + for id, config := range bg.idToBalancerConfig { + config.startBalancer(&balancerGroupCC{ + ClientConn: bg.cc, + id: id, + group: bg, + }) } + bg.outgoingStarted = true + bg.outgoingMu.Unlock() } // add adds a balancer built by builder to the group, with given id and weight. @@ -89,23 +218,10 @@ func (bg *balancerGroup) add(id internal.Locality, weight uint32, builder balanc grpclog.Errorf("balancerGroup.add called with weight 0, locality: %v. Locality is not added to balancer group", id) return } - bg.mu.Lock() - if _, ok := bg.idToBalancer[id]; ok { - bg.mu.Unlock() - grpclog.Warningf("balancer group: adding a balancer with existing ID: %s", id) - return - } - bg.mu.Unlock() - bgcc := &balancerGroupCC{ - id: id, - group: bg, - } - b := builder.Build(bgcc, balancer.BuildOptions{}) - bg.mu.Lock() - bg.idToBalancer[id] = b - bg.mu.Unlock() - bg.pickerMu.Lock() + // First, add things to the picker map. Do this even if incomingStarted is + // false, because the data is static. + bg.incomingMu.Lock() bg.idToPickerState[id] = &pickerState{ weight: weight, // Start everything in IDLE. It's doesn't affect the overall state @@ -113,7 +229,30 @@ func (bg *balancerGroup) add(id internal.Locality, weight uint32, builder balanc // READY, 1 READY results in overall READY). state: connectivity.Idle, } - bg.pickerMu.Unlock() + bg.incomingMu.Unlock() + + // Store data in static map, and then check to see if bg is started. + bg.outgoingMu.Lock() + if _, ok := bg.idToBalancerConfig[id]; ok { + bg.outgoingMu.Unlock() + grpclog.Warningf("balancer group: adding a balancer with existing ID: %s", id) + return + } + sbc := &subBalancerWithConfig{ + builder: builder, + } + bg.idToBalancerConfig[id] = sbc + + if bg.outgoingStarted { + // Only start the balancer if bg is started. Otherwise, we only keep the + // static data. + sbc.startBalancer(&balancerGroupCC{ + ClientConn: bg.cc, + id: id, + group: bg, + }) + } + bg.outgoingMu.Unlock() } // remove removes the balancer with id from the group, and closes the balancer. @@ -121,28 +260,40 @@ func (bg *balancerGroup) add(id internal.Locality, weight uint32, builder balanc // It also removes the picker generated from this balancer from the picker // group. It always results in a picker update. func (bg *balancerGroup) remove(id internal.Locality) { - bg.mu.Lock() - // Close balancer. - if b, ok := bg.idToBalancer[id]; ok { - b.Close() - delete(bg.idToBalancer, id) + bg.outgoingMu.Lock() + if bg.outgoingStarted { + if config, ok := bg.idToBalancerConfig[id]; ok { + config.stopBalancer() + } } + delete(bg.idToBalancerConfig, id) + bg.outgoingMu.Unlock() + + bg.incomingMu.Lock() // Remove SubConns. + // + // NOTE: if NewSubConn is called by this (closed) balancer later, the + // SubConn will be leaked. This shouldn't happen if the balancer + // implementation is correct. To make sure this never happens, we need to + // add another layer (balancer manager) between balancer group and the + // sub-balancers. for sc, bid := range bg.scToID { if bid == id { bg.cc.RemoveSubConn(sc) delete(bg.scToID, sc) } } - bg.mu.Unlock() - bg.pickerMu.Lock() // Remove id and picker from picker map. This also results in future updates // for this ID to be ignored. delete(bg.idToPickerState, id) - // Update state and picker to reflect the changes. - bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) - bg.pickerMu.Unlock() + if bg.incomingStarted { + // Normally picker update is triggered by SubConn state change. But we + // want to update state and picker to reflect the changes, too. Because + // we don't want `ClientConn` to pick this sub-balancer anymore. + bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) + } + bg.incomingMu.Unlock() } // changeWeight changes the weight of the balancer. @@ -157,8 +308,8 @@ func (bg *balancerGroup) changeWeight(id internal.Locality, newWeight uint32) { grpclog.Errorf("balancerGroup.changeWeight called with newWeight 0. Weight is not changed") return } - bg.pickerMu.Lock() - defer bg.pickerMu.Unlock() + bg.incomingMu.Lock() + defer bg.incomingMu.Unlock() pState, ok := bg.idToPickerState[id] if !ok { return @@ -167,8 +318,12 @@ func (bg *balancerGroup) changeWeight(id internal.Locality, newWeight uint32) { return } pState.weight = newWeight - // Update state and picker to reflect the changes. - bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) + if bg.incomingStarted { + // Normally picker update is triggered by SubConn state change. But we + // want to update state and picker to reflect the changes, too. Because + // `ClientConn` should do pick with the new weights now. + bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) + } } // Following are actions from the parent grpc.ClientConn, forward to sub-balancers. @@ -176,41 +331,32 @@ func (bg *balancerGroup) changeWeight(id internal.Locality, newWeight uint32) { // SubConn state change: find the corresponding balancer and then forward. func (bg *balancerGroup) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { grpclog.Infof("balancer group: handle subconn state change: %p, %v", sc, state) - bg.mu.Lock() - var b balancer.Balancer - if id, ok := bg.scToID[sc]; ok { - if state == connectivity.Shutdown { - // Only delete sc from the map when state changed to Shutdown. - delete(bg.scToID, sc) - } - b = bg.idToBalancer[id] - } - bg.mu.Unlock() - if b == nil { - grpclog.Infof("balancer group: balancer not found for sc state change") + bg.incomingMu.Lock() + id, ok := bg.scToID[sc] + if !ok { + bg.incomingMu.Unlock() return } - if ub, ok := b.(balancer.V2Balancer); ok { - ub.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: state}) - } else { - b.HandleSubConnStateChange(sc, state) + if state == connectivity.Shutdown { + // Only delete sc from the map when state changed to Shutdown. + delete(bg.scToID, sc) } + bg.incomingMu.Unlock() + + bg.outgoingMu.Lock() + if config, ok := bg.idToBalancerConfig[id]; ok { + config.handleSubConnStateChange(sc, state) + } + bg.outgoingMu.Unlock() } // Address change: forward to balancer. func (bg *balancerGroup) handleResolvedAddrs(id internal.Locality, addrs []resolver.Address) { - bg.mu.Lock() - b, ok := bg.idToBalancer[id] - bg.mu.Unlock() - if !ok { - grpclog.Infof("balancer group: balancer with id %q not found", id) - return - } - if ub, ok := b.(balancer.V2Balancer); ok { - ub.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}}) - } else { - b.HandleResolvedAddrs(addrs, nil) + bg.outgoingMu.Lock() + if config, ok := bg.idToBalancerConfig[id]; ok { + config.updateAddrs(addrs) } + bg.outgoingMu.Unlock() } // TODO: handleServiceConfig() @@ -228,13 +374,19 @@ func (bg *balancerGroup) handleResolvedAddrs(id internal.Locality, addrs []resol // it's just forwarding the action, there's no need for a removeSubConn() // wrapper function. func (bg *balancerGroup) newSubConn(id internal.Locality, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + bg.incomingMu.Lock() + defer bg.incomingMu.Unlock() + if !bg.incomingStarted { + return nil, fmt.Errorf("NewSubConn is called after balancer is closed") + } + // NOTE: if balancer with id was already removed, this should also return + // error. But since we call balancer.stopBalancer when removing the balancer, this + // shouldn't happen. sc, err := bg.cc.NewSubConn(addrs, opts) if err != nil { return nil, err } - bg.mu.Lock() bg.scToID[sc] = id - bg.mu.Unlock() return sc, nil } @@ -242,8 +394,9 @@ func (bg *balancerGroup) newSubConn(id internal.Locality, addrs []resolver.Addre // connectivity state, then forward to ClientConn. func (bg *balancerGroup) updateBalancerState(id internal.Locality, state connectivity.State, picker balancer.Picker) { grpclog.Infof("balancer group: update balancer state: %v, %v, %p", id, state, picker) - bg.pickerMu.Lock() - defer bg.pickerMu.Unlock() + + bg.incomingMu.Lock() + defer bg.incomingMu.Unlock() pickerSt, ok := bg.idToPickerState[id] if !ok { // All state starts in IDLE. If ID is not in map, it's either removed, @@ -253,19 +406,39 @@ func (bg *balancerGroup) updateBalancerState(id internal.Locality, state connect } pickerSt.picker = newLoadReportPicker(picker, id, bg.loadStore) pickerSt.state = state - bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) + if bg.incomingStarted { + bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) + } } func (bg *balancerGroup) close() { - bg.mu.Lock() - for _, b := range bg.idToBalancer { - b.Close() + bg.incomingMu.Lock() + if bg.incomingStarted { + bg.incomingStarted = false + + for _, pState := range bg.idToPickerState { + // Reset everything to IDLE but keep the entry in map (to keep the + // weight). + pState.picker = nil + pState.state = connectivity.Idle + } + + // Also remove all SubConns. + for sc := range bg.scToID { + bg.cc.RemoveSubConn(sc) + delete(bg.scToID, sc) + } } - // Also remove all SubConns. - for sc := range bg.scToID { - bg.cc.RemoveSubConn(sc) + bg.incomingMu.Unlock() + + bg.outgoingMu.Lock() + if bg.outgoingStarted { + bg.outgoingStarted = false + for _, config := range bg.idToBalancerConfig { + config.stopBalancer() + } } - bg.mu.Unlock() + bg.outgoingMu.Unlock() } func buildPickerAndState(m map[internal.Locality]*pickerState) (connectivity.State, balancer.Picker) { @@ -382,6 +555,7 @@ func (lrp *loadReportPicker) Pick(ctx context.Context, opts balancer.PickOptions // Some of the actions are forwarded to the parent ClientConn with no change. // Some are forward to balancer group with the sub-balancer ID. type balancerGroupCC struct { + balancer.ClientConn id internal.Locality group *balancerGroup } @@ -389,15 +563,6 @@ type balancerGroupCC struct { func (bgcc *balancerGroupCC) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { return bgcc.group.newSubConn(bgcc.id, addrs, opts) } -func (bgcc *balancerGroupCC) RemoveSubConn(sc balancer.SubConn) { - bgcc.group.cc.RemoveSubConn(sc) -} func (bgcc *balancerGroupCC) UpdateBalancerState(state connectivity.State, picker balancer.Picker) { bgcc.group.updateBalancerState(bgcc.id, state, picker) } -func (bgcc *balancerGroupCC) ResolveNow(opt resolver.ResolveNowOption) { - bgcc.group.cc.ResolveNow(opt) -} -func (bgcc *balancerGroupCC) Target() string { - return bgcc.group.cc.Target() -} diff --git a/xds/internal/balancer/edsbalancer/balancergroup_test.go b/xds/internal/balancer/edsbalancer/balancergroup_test.go index 748eb0b64a4..1f46b6aaee7 100644 --- a/xds/internal/balancer/edsbalancer/balancergroup_test.go +++ b/xds/internal/balancer/edsbalancer/balancergroup_test.go @@ -39,6 +39,7 @@ var ( func TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) { cc := newTestClientConn(t) bg := newBalancerGroup(cc, nil) + bg.start() // Add one balancer to group. bg.add(testBalancerIDs[0], 1, rrBuilder) @@ -98,6 +99,7 @@ func TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) { func TestBalancerGroup_TwoRR_OneBackend(t *testing.T) { cc := newTestClientConn(t) bg := newBalancerGroup(cc, nil) + bg.start() // Add two balancers to group and send one resolved address to both // balancers. @@ -130,6 +132,7 @@ func TestBalancerGroup_TwoRR_OneBackend(t *testing.T) { func TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { cc := newTestClientConn(t) bg := newBalancerGroup(cc, nil) + bg.start() // Add two balancers to group and send one resolved address to both // balancers. @@ -226,6 +229,7 @@ func TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { func TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) { cc := newTestClientConn(t) bg := newBalancerGroup(cc, nil) + bg.start() // Add two balancers to group and send two resolved addresses to both // balancers. @@ -264,6 +268,7 @@ func TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) { func TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) { cc := newTestClientConn(t) bg := newBalancerGroup(cc, nil) + bg.start() // Add three balancers to group and send one resolved address to both // balancers. @@ -331,6 +336,7 @@ func TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) { func TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) { cc := newTestClientConn(t) bg := newBalancerGroup(cc, nil) + bg.start() // Add two balancers to group and send two resolved addresses to both // balancers. @@ -382,6 +388,7 @@ func TestBalancerGroup_LoadReport(t *testing.T) { cc := newTestClientConn(t) bg := newBalancerGroup(cc, testLoadStore) + bg.start() backendToBalancerID := make(map[balancer.SubConn]internal.Locality) @@ -450,3 +457,114 @@ func TestBalancerGroup_LoadReport(t *testing.T) { t.Fatalf("want cost: %v, got: %v", testLoadStore.callsCost, wantCost) } } + +// Create a new balancer group, add balancer and backends, but not start. +// - b1, weight 2, backends [0,1] +// - b2, weight 1, backends [2,3] +// Start the balancer group and check behavior. +// +// Close the balancer group, call add/remove/change weight/change address. +// - b2, weight 3, backends [0,3] +// - b3, weight 1, backends [1,2] +// Start the balancer group again and check for behavior. +func TestBalancerGroup_start_close(t *testing.T) { + cc := newTestClientConn(t) + bg := newBalancerGroup(cc, nil) + + // Add two balancers to group and send two resolved addresses to both + // balancers. + bg.add(testBalancerIDs[0], 2, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) + bg.add(testBalancerIDs[1], 1, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) + + bg.start() + + m1 := make(map[resolver.Address]balancer.SubConn) + for i := 0; i < 4; i++ { + addrs := <-cc.newSubConnAddrsCh + sc := <-cc.newSubConnCh + m1[addrs[0]] = sc + bg.handleSubConnStateChange(sc, connectivity.Connecting) + bg.handleSubConnStateChange(sc, connectivity.Ready) + } + + // Test roundrobin on the last picker. + p1 := <-cc.newPickerCh + want := []balancer.SubConn{ + m1[testBackendAddrs[0]], m1[testBackendAddrs[0]], + m1[testBackendAddrs[1]], m1[testBackendAddrs[1]], + m1[testBackendAddrs[2]], m1[testBackendAddrs[3]], + } + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + bg.close() + for i := 0; i < 4; i++ { + bg.handleSubConnStateChange(<-cc.removeSubConnCh, connectivity.Shutdown) + } + + // Add b3, weight 1, backends [1,2]. + bg.add(testBalancerIDs[2], 1, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[2], testBackendAddrs[1:3]) + + // Remove b1. + bg.remove(testBalancerIDs[0]) + + // Update b2 to weight 3, backends [0,3]. + bg.changeWeight(testBalancerIDs[1], 3) + bg.handleResolvedAddrs(testBalancerIDs[1], append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3])) + + bg.start() + + m2 := make(map[resolver.Address]balancer.SubConn) + for i := 0; i < 4; i++ { + addrs := <-cc.newSubConnAddrsCh + sc := <-cc.newSubConnCh + m2[addrs[0]] = sc + bg.handleSubConnStateChange(sc, connectivity.Connecting) + bg.handleSubConnStateChange(sc, connectivity.Ready) + } + + // Test roundrobin on the last picker. + p2 := <-cc.newPickerCh + want = []balancer.SubConn{ + m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], + m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], + m2[testBackendAddrs[1]], m2[testBackendAddrs[2]], + } + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } +} + +// Test that balancer group start() doesn't deadlock if the balancer calls back +// into balancer group inline when it gets an update. +// +// The potential deadlock can happen if we +// - hold a lock and send updates to balancer (e.g. update resolved addresses) +// - the balancer calls back (NewSubConn or update picker) in line +// The callback will try to hold hte same lock again, which will cause a +// deadlock. +// +// This test starts the balancer group with a test balancer, will updates picker +// whenever it gets an address update. It's expected that start() doesn't block +// because of deadlock. +func TestBalancerGroup_start_close_deadlock(t *testing.T) { + cc := newTestClientConn(t) + bg := newBalancerGroup(cc, nil) + + bg.add(testBalancerIDs[0], 2, &testConstBalancerBuilder{}) + bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) + bg.add(testBalancerIDs[1], 1, &testConstBalancerBuilder{}) + bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) + + bg.start() +} diff --git a/xds/internal/balancer/edsbalancer/edsbalancer.go b/xds/internal/balancer/edsbalancer/edsbalancer.go index 0038830d4aa..43866f15f08 100644 --- a/xds/internal/balancer/edsbalancer/edsbalancer.go +++ b/xds/internal/balancer/edsbalancer/edsbalancer.go @@ -176,6 +176,7 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *edspb.ClusterLoadAssignment) // response). if xdsB.bg == nil { xdsB.bg = newBalancerGroup(xdsB, xdsB.loadStore) + xdsB.bg.start() } // TODO: Unhandled fields from EDS response: diff --git a/xds/internal/balancer/edsbalancer/test_util_test.go b/xds/internal/balancer/edsbalancer/test_util_test.go index 86c7dfb7965..6fc99c82891 100644 --- a/xds/internal/balancer/edsbalancer/test_util_test.go +++ b/xds/internal/balancer/edsbalancer/test_util_test.go @@ -28,9 +28,17 @@ import ( "google.golang.org/grpc/xds/internal" ) -var ( - testSubConns = []*testSubConn{{id: "sc1"}, {id: "sc2"}, {id: "sc3"}, {id: "sc4"}} -) +const testSubConnsCount = 8 + +var testSubConns []*testSubConn + +func init() { + for i := 0; i < testSubConnsCount; i++ { + testSubConns = append(testSubConns, &testSubConn{ + id: fmt.Sprintf("sc%d", i), + }) + } +} type testSubConn struct { id string