Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ringhash: fix normalizeWeights #7156

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 13 additions & 16 deletions xds/internal/balancer/ringhash/ring.go
Expand Up @@ -122,24 +122,21 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64, log
// Must be called with a non-empty subConns map.
func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64) {
var weightSum uint32
keys := subConns.Keys()
for _, a := range keys {
weightSum += getWeightAttribute(a)
scVals := subConns.Values()
for _, a := range scVals {
weightSum += a.(*subConn).weight
}
ret := make([]subConnWithWeight, 0, len(keys))
min := float64(1.0)
for _, a := range keys {
v, _ := subConns.Get(a)
scInfo := v.(*subConn)
// getWeightAttribute() returns 1 if the weight attribute is not found
// on the address. And since this function is guaranteed to be called
// with a non-empty subConns map, weightSum is guaranteed to be
// non-zero. So, we need not worry about divide a by zero error here.
nw := float64(getWeightAttribute(a)) / float64(weightSum)
ret := make([]subConnWithWeight, 0, subConns.Len())
min := 1.0
for _, a := range scVals {
scInfo := a.(*subConn)
// (*subConn).weight returns 1 if the weight attribute is not found on
// the address. And since this function is guaranteed to be called with
// a non-empty subConns map, weightSum is guaranteed to be non-zero. So,
// we need not worry about divide by zero error here.
nw := float64(scInfo.weight) / float64(weightSum)
ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw})
if nw < min {
min = nw
}
min = math.Min(min, nw)
}
// Sort the addresses to return consistent results.
//
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/ringhash/ring_test.go
Expand Up @@ -38,9 +38,9 @@ func init() {
testAddr("c", 4),
}
testSubConnMap = resolver.NewAddressMap()
testSubConnMap.Set(testAddrs[0], &subConn{addr: "a"})
testSubConnMap.Set(testAddrs[1], &subConn{addr: "b"})
testSubConnMap.Set(testAddrs[2], &subConn{addr: "c"})
testSubConnMap.Set(testAddrs[0], &subConn{addr: "a", weight: 3})
testSubConnMap.Set(testAddrs[1], &subConn{addr: "b", weight: 3})
testSubConnMap.Set(testAddrs[2], &subConn{addr: "c", weight: 4})
}

func testAddr(addr string, weight uint32) resolver.Address {
Expand Down
36 changes: 33 additions & 3 deletions xds/internal/balancer/ringhash/ringhash_test.go
Expand Up @@ -344,17 +344,26 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) {
}
}

// TestAddrWeightChange covers the following scenarios after setting up the
// balancer with 3 addresses [A, B, C]:
// - updates balancer with [A, B, C], a new Picker should not be sent.
// - updates balancer with [A, B] (C removed), a new Picker is sent and the
// ring is updated.
// - updates balancer with [A, B], but B has a weight of 2, a new Picker is
// sent. And the new ring should contain the correct number of entries
// and weights.
func (s) TestAddrWeightChange(t *testing.T) {
wantAddrs := []resolver.Address{
addrs := []resolver.Address{
{Addr: testBackendAddrStrs[0]},
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
cc, b, p0 := setupTest(t, wantAddrs)
cc, b, p0 := setupTest(t, addrs)
ring0 := p0.(*picker).ring

// Update with the same addresses, should not send a new Picker.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: wantAddrs},
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
Expand Down Expand Up @@ -407,6 +416,27 @@ func (s) TestAddrWeightChange(t *testing.T) {
if p2.(*picker).ring == ring1 {
t.Fatalf("new picker after changing address weight has the same ring as before, want different")
}
// With the new update, the ring must look like this:
// [
// {idx:0 sc: {addr: testBackendAddrStrs[0], weight: 1}},
// {idx:1 sc: {addr: testBackendAddrStrs[1], weight: 2}},
// {idx:2 sc: {addr: testBackendAddrStrs[2], weight: 2}},
// ].
if len(p2.(*picker).ring.items) != 3 {
t.Fatalf("new picker after changing address weight has %d entries, want 3", len(p2.(*picker).ring.items))
}
for _, i := range p2.(*picker).ring.items {
if i.sc.addr == testBackendAddrStrs[0] {
if i.sc.weight != 1 {
t.Fatalf("new picker after changing address weight has weight %d for %v, want 1", i.sc.weight, i.sc.addr)
}
}
if i.sc.addr == testBackendAddrStrs[1] {
if i.sc.weight != 2 {
t.Fatalf("new picker after changing address weight has weight %d for %v, want 2", i.sc.weight, i.sc.addr)
}
}
}
}

// TestSubConnToConnectWhenOverallTransientFailure covers the situation when the
Expand Down