Skip to content

Commit

Permalink
xds: update balancer group ID type to string (#3862)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Sep 2, 2020
1 parent 9a132e4 commit 8630cac
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 94 deletions.
19 changes: 9 additions & 10 deletions xds/internal/balancer/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
)

// loadReporter wraps the methods from the loadStore that are used here.
Expand Down Expand Up @@ -58,7 +57,7 @@ type subBalancerWrapper struct {
// of the actions are forwarded to the parent ClientConn with no change.
// Some are forward to balancer group with the sub-balancer ID.
balancer.ClientConn
id internal.LocalityID
id string
group *BalancerGroup

mu sync.Mutex
Expand Down Expand Up @@ -202,7 +201,7 @@ type BalancerGroup struct {
// to sub-balancers after they are closed.
outgoingMu sync.Mutex
outgoingStarted bool
idToBalancerConfig map[internal.LocalityID]*subBalancerWrapper
idToBalancerConfig map[string]*subBalancerWrapper
// Cache for sub-balancers when they are removed.
balancerCache *cache.TimeoutCache

Expand Down Expand Up @@ -250,7 +249,7 @@ func New(cc balancer.ClientConn, stateAggregator BalancerStateAggregator, loadSt

stateAggregator: stateAggregator,

idToBalancerConfig: make(map[internal.LocalityID]*subBalancerWrapper),
idToBalancerConfig: make(map[string]*subBalancerWrapper),
balancerCache: cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout),
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWrapper),
}
Expand Down Expand Up @@ -281,7 +280,7 @@ func (bg *BalancerGroup) Start() {
}

// Add adds a balancer built by builder to the group, with given id.
func (bg *BalancerGroup) Add(id internal.LocalityID, builder balancer.Builder) {
func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
// Store data in static map, and then check to see if bg is started.
bg.outgoingMu.Lock()
var sbc *subBalancerWrapper
Expand Down Expand Up @@ -332,7 +331,7 @@ func (bg *BalancerGroup) Add(id internal.LocalityID, builder balancer.Builder) {
// But doesn't close the balancer. The balancer is kept in a cache, and will be
// closed after timeout. Cleanup work (closing sub-balancer and removing
// subconns) will be done after timeout.
func (bg *BalancerGroup) Remove(id internal.LocalityID) {
func (bg *BalancerGroup) Remove(id string) {
bg.outgoingMu.Lock()
if sbToRemove, ok := bg.idToBalancerConfig[id]; ok {
if bg.outgoingStarted {
Expand Down Expand Up @@ -400,7 +399,7 @@ func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer.

// UpdateClientConnState handles ClientState (including balancer config and
// addresses) from resolver. It finds the balancer and forwards the update.
func (bg *BalancerGroup) UpdateClientConnState(id internal.LocalityID, s balancer.ClientConnState) error {
func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnState) error {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if config, ok := bg.idToBalancerConfig[id]; ok {
Expand Down Expand Up @@ -449,7 +448,7 @@ func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver
// updateBalancerState: forward the new state to balancer state aggregator. The
// aggregator will create an aggregated picker and an aggregated connectivity
// state, then forward to ClientConn.
func (bg *BalancerGroup) updateBalancerState(id internal.LocalityID, state balancer.State) {
func (bg *BalancerGroup) updateBalancerState(id string, state balancer.State) {
bg.logger.Infof("Balancer state update from locality %v, new state: %+v", id, state)
if bg.loadStore != nil {
// Only wrap the picker to do load reporting if loadStore was set.
Expand Down Expand Up @@ -504,10 +503,10 @@ type loadReportPicker struct {
loadStore loadReporter
}

func newLoadReportPicker(p balancer.Picker, id internal.LocalityID, loadStore loadReporter) *loadReportPicker {
func newLoadReportPicker(p balancer.Picker, id string, loadStore loadReporter) *loadReportPicker {
return &loadReportPicker{
p: p,
locality: id.String(),
locality: id,
loadStore: loadStore,
}
}
Expand Down
9 changes: 4 additions & 5 deletions xds/internal/balancer/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/testutils"
Expand All @@ -48,7 +47,7 @@ import (
var (
rrBuilder = balancer.Get(roundrobin.Name)
pfBuilder = balancer.Get(grpc.PickFirstBalancerName)
testBalancerIDs = []internal.LocalityID{{Region: "b1"}, {Region: "b2"}, {Region: "b3"}}
testBalancerIDs = []string{"b1", "b2", "b3"}
testBackendAddrs []resolver.Address
)

Expand Down Expand Up @@ -404,7 +403,7 @@ func (s) TestBalancerGroup_LoadReport(t *testing.T) {
loadStore := &load.Store{}
cc, gator, bg := newTestBalancerGroup(t, loadStore)

backendToBalancerID := make(map[balancer.SubConn]internal.LocalityID)
backendToBalancerID := make(map[balancer.SubConn]string)

// Add two balancers to group and send two resolved addresses to both
// balancers.
Expand Down Expand Up @@ -443,7 +442,7 @@ func (s) TestBalancerGroup_LoadReport(t *testing.T) {
// these to show up as pending rpcs.
wantStoreData := &load.Data{
LocalityStats: map[string]load.LocalityData{
testBalancerIDs[0].String(): {
testBalancerIDs[0]: {
RequestStats: load.RequestData{Succeeded: 10, InProgress: 10},
LoadStats: map[string]load.ServerLoadData{
"cpu_utilization": {Count: 10, Sum: 100},
Expand All @@ -452,7 +451,7 @@ func (s) TestBalancerGroup_LoadReport(t *testing.T) {
"piu": {Count: 10, Sum: 31.4},
},
},
testBalancerIDs[1].String(): {
testBalancerIDs[1]: {
RequestStats: load.RequestData{Succeeded: 10},
LoadStats: map[string]load.ServerLoadData{
"cpu_utilization": {Count: 10, Sum: 100},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package balancergroup

import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/xds/internal"
)

// BalancerStateAggregator aggregates sub-picker and connectivity states into a
Expand All @@ -34,5 +33,5 @@ type BalancerStateAggregator interface {
//
// It's up to the implementation whether this will trigger an update to the
// parent ClientConn.
UpdateState(id internal.LocalityID, state balancer.State)
UpdateState(id string, state balancer.State)
}
20 changes: 10 additions & 10 deletions xds/internal/balancer/edsbalancer/eds_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ func (edsImpl *edsBalancerImpl) handleChildPolicy(name string, config json.RawMe
if bgwc == nil {
continue
}
for id, config := range bgwc.configs {
for lid, config := range bgwc.configs {
// TODO: (eds) add support to balancer group to support smoothly
// switching sub-balancers (keep old balancer around until new
// balancer becomes ready).
bgwc.bg.Remove(id)
bgwc.bg.Add(id, edsImpl.subBalancerBuilder)
bgwc.bg.UpdateClientConnState(id, balancer.ClientConnState{
bgwc.bg.Remove(lid.String())
bgwc.bg.Add(lid.String(), edsImpl.subBalancerBuilder)
bgwc.bg.UpdateClientConnState(lid.String(), balancer.ClientConnState{
ResolverState: resolver.State{Addresses: config.addrs},
})
// This doesn't need to manually update picker, because the new
Expand Down Expand Up @@ -315,8 +315,8 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
config, ok := bgwc.configs[lid]
if !ok {
// A new balancer, add it to balancer group and balancer map.
bgwc.stateAggregator.Add(lid, newWeight)
bgwc.bg.Add(lid, edsImpl.subBalancerBuilder)
bgwc.stateAggregator.Add(lid.String(), newWeight)
bgwc.bg.Add(lid.String(), edsImpl.subBalancerBuilder)
config = &localityConfig{
weight: newWeight,
}
Expand All @@ -339,13 +339,13 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup

if weightChanged {
config.weight = newWeight
bgwc.stateAggregator.UpdateWeight(lid, newWeight)
bgwc.stateAggregator.UpdateWeight(lid.String(), newWeight)
rebuildStateAndPicker = true
}

if addrsChanged {
config.addrs = newAddrs
bgwc.bg.UpdateClientConnState(lid, balancer.ClientConnState{
bgwc.bg.UpdateClientConnState(lid.String(), balancer.ClientConnState{
ResolverState: resolver.State{Addresses: newAddrs},
})
}
Expand All @@ -354,8 +354,8 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
// Delete localities that are removed in the latest response.
for lid := range bgwc.configs {
if _, ok := newLocalitiesSet[lid]; !ok {
bgwc.stateAggregator.Remove(lid)
bgwc.bg.Remove(lid)
bgwc.stateAggregator.Remove(lid.String())
bgwc.bg.Remove(lid.String())
delete(bgwc.configs, lid)
edsImpl.logger.Infof("Locality %v deleted", lid)
rebuildStateAndPicker = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/xds/internal"
)

type weightedPickerState struct {
Expand Down Expand Up @@ -68,7 +67,7 @@ type Aggregator struct {
// started.
//
// If an ID is not in map, it's either removed or never added.
idToPickerState map[internal.LocalityID]*weightedPickerState
idToPickerState map[string]*weightedPickerState
}

// New creates a new weighted balancer state aggregator.
Expand All @@ -77,7 +76,7 @@ func New(cc balancer.ClientConn, logger *grpclog.PrefixLogger, newWRR func() wrr
cc: cc,
logger: logger,
newWRR: newWRR,
idToPickerState: make(map[internal.LocalityID]*weightedPickerState),
idToPickerState: make(map[string]*weightedPickerState),
}
}

Expand All @@ -100,7 +99,7 @@ func (wbsa *Aggregator) Stop() {

// Add adds a sub-balancer state with weight. It adds a place holder, and waits for
// the real sub-balancer to update state.
func (wbsa *Aggregator) Add(id internal.LocalityID, weight uint32) {
func (wbsa *Aggregator) Add(id string, weight uint32) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.idToPickerState[id] = &weightedPickerState{
Expand All @@ -118,7 +117,7 @@ func (wbsa *Aggregator) Add(id internal.LocalityID, weight uint32) {

// Remove removes the sub-balancer state. Future updates from this sub-balancer,
// if any, will be ignored.
func (wbsa *Aggregator) Remove(id internal.LocalityID) {
func (wbsa *Aggregator) Remove(id string) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
if _, ok := wbsa.idToPickerState[id]; !ok {
Expand All @@ -132,7 +131,7 @@ func (wbsa *Aggregator) Remove(id internal.LocalityID) {
// UpdateWeight updates the weight for the given id. Note that this doesn't
// trigger an update to the parent ClientConn. The caller should decide when
// it's necessary, and call BuildAndUpdate.
func (wbsa *Aggregator) UpdateWeight(id internal.LocalityID, newWeight uint32) {
func (wbsa *Aggregator) UpdateWeight(id string, newWeight uint32) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
pState, ok := wbsa.idToPickerState[id]
Expand All @@ -146,7 +145,7 @@ func (wbsa *Aggregator) UpdateWeight(id internal.LocalityID, newWeight uint32) {
// It's usually called by the balancer group.
//
// It calls parent ClientConn's UpdateState with the new aggregated state.
func (wbsa *Aggregator) UpdateState(id internal.LocalityID, newState balancer.State) {
func (wbsa *Aggregator) UpdateState(id string, newState balancer.State) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
oldState, ok := wbsa.idToPickerState[id]
Expand Down
21 changes: 6 additions & 15 deletions xds/internal/balancer/weightedtarget/weightedtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
)
Expand Down Expand Up @@ -79,11 +78,6 @@ type weightedTargetBalancer struct {
targets map[string]target
}

// TODO: remove this and use strings directly as keys for balancer group.
func makeLocalityFromName(name string) internal.LocalityID {
return internal.LocalityID{Region: name}
}

// UpdateClientConnState takes the new targets in balancer group,
// creates/deletes sub-balancers and sends them update. Addresses are split into
// groups based on hierarchy path.
Expand All @@ -99,9 +93,8 @@ func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
// Remove sub-pickers and sub-balancers that are not in the new config.
for name := range w.targets {
if _, ok := newConfig.Targets[name]; !ok {
l := makeLocalityFromName(name)
w.stateAggregator.Remove(l)
w.bg.Remove(l)
w.stateAggregator.Remove(name)
w.bg.Remove(name)
// Trigger a state/picker update, because we don't want `ClientConn`
// to pick this sub-balancer anymore.
rebuildStateAndPicker = true
Expand All @@ -114,19 +107,17 @@ func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
//
// For all sub-balancers, forward the address/balancer config update.
for name, newT := range newConfig.Targets {
l := makeLocalityFromName(name)

oldT, ok := w.targets[name]
if !ok {
// If this is a new sub-balancer, add weights to the picker map.
w.stateAggregator.Add(l, newT.Weight)
w.stateAggregator.Add(name, newT.Weight)
// Then add to the balancer group.
w.bg.Add(l, balancer.Get(newT.ChildPolicy.Name))
w.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
// Not trigger a state/picker update. Wait for the new sub-balancer
// to send its updates.
} else if newT.Weight != oldT.Weight {
// If this is an existing sub-balancer, update weight if necessary.
w.stateAggregator.UpdateWeight(l, newT.Weight)
w.stateAggregator.UpdateWeight(name, newT.Weight)
// Trigger a state/picker update, because we don't want `ClientConn`
// should do picks with the new weights now.
rebuildStateAndPicker = true
Expand All @@ -138,7 +129,7 @@ func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
// - Balancer config comes from the targets map.
//
// TODO: handle error? How to aggregate errors and return?
_ = w.bg.UpdateClientConnState(l, balancer.ClientConnState{
_ = w.bg.UpdateClientConnState(name, balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: addressesSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
Expand Down
11 changes: 5 additions & 6 deletions xds/internal/balancer/xdsrouting/balancerstateaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal"
)

type subBalancerState struct {
Expand Down Expand Up @@ -59,14 +58,14 @@ type balancerStateAggregator struct {
// started.
//
// If an ID is not in map, it's either removed or never added.
idToPickerState map[internal.LocalityID]*subBalancerState
idToPickerState map[string]*subBalancerState
}

func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *balancerStateAggregator {
return &balancerStateAggregator{
cc: cc,
logger: logger,
idToPickerState: make(map[internal.LocalityID]*subBalancerState),
idToPickerState: make(map[string]*subBalancerState),
}
}

Expand All @@ -91,7 +90,7 @@ func (rbsa *balancerStateAggregator) close() {
// for the real sub-balancer to update state.
//
// This is called when there's a new action.
func (rbsa *balancerStateAggregator) add(id internal.LocalityID) {
func (rbsa *balancerStateAggregator) add(id string) {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
rbsa.idToPickerState[id] = &subBalancerState{
Expand All @@ -110,7 +109,7 @@ func (rbsa *balancerStateAggregator) add(id internal.LocalityID) {
// if any, will be ignored.
//
// This is called when an action is removed.
func (rbsa *balancerStateAggregator) remove(id internal.LocalityID) {
func (rbsa *balancerStateAggregator) remove(id string) {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
if _, ok := rbsa.idToPickerState[id]; !ok {
Expand All @@ -134,7 +133,7 @@ func (rbsa *balancerStateAggregator) updateRoutes(newRoutes []route) {
// It's usually called by the balancer group.
//
// It calls parent ClientConn's UpdateState with the new aggregated state.
func (rbsa *balancerStateAggregator) UpdateState(id internal.LocalityID, state balancer.State) {
func (rbsa *balancerStateAggregator) UpdateState(id string, state balancer.State) {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
pickerSt, ok := rbsa.idToPickerState[id]
Expand Down

0 comments on commit 8630cac

Please sign in to comment.