Skip to content

Commit 1429d47

Browse files
committedOct 21, 2023
sticky balancer: try for better topic distribution among members
The sticky balancer currently strives for ultimate stickiness, with no regard to trying to balance topic partitions among members equally. When adding a member, it is often the case that an entire topic's partitions shifts to the other member, while the first member has the other topic. By sorting by partition number before balancing, when the algorithm steals partitions from the end of an existing member to give to the new member, we ensure that we divvy up the topics equally to both members while still ensuring stickiness. This is likely not perfect but it goes a long way.
1 parent 6a961da commit 1429d47

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed
 

‎pkg/kgo/internal/sticky/sticky.go

+18
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,24 @@ func (b *balancer) assignUnassignedAndInitGraph() {
472472
}
473473

474474
b.tryRestickyStales(topicPotentials, partitionConsumers)
475+
476+
// For each member, we now sort their current partitions by partition,
477+
// then topic. Sorting the lowest numbers first means that once we
478+
// steal from the end (when adding a member), we steal equally across
479+
// all topics. This benefits the standard case the most, where all
480+
// members consume equally.
481+
for memberNum := range b.plan {
482+
partNums := b.plan[memberNum]
483+
sort.Slice(partNums, func(i, j int) bool {
484+
lpNum, rpNum := partNums[i], partNums[j]
485+
ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum]
486+
li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum]
487+
lt, rt := li.topic, ri.topic
488+
lp, rp := lpNum-li.partNum, rpNum-ri.partNum
489+
return lp < rp || (lp == rp && lt < rt)
490+
})
491+
}
492+
475493
for _, potentials := range topicPotentials {
476494
(&membersByPartitions{potentials, b.plan}).init()
477495
}

‎pkg/kgo/internal/sticky/sticky_test.go

+24
Original file line numberDiff line numberDiff line change
@@ -1517,6 +1517,30 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testi
15171517
testPlanUsage(t, plan3, topics, nil)
15181518
}
15191519

1520+
func Test_stickyAddEqualMove(t *testing.T) {
1521+
t.Parallel()
1522+
topics := map[string]int32{"foo": 16, "bar": 16}
1523+
members := []GroupMember{
1524+
{ID: "1", Topics: []string{"foo", "bar"}},
1525+
}
1526+
plan1 := Balance(members, topics)
1527+
1528+
// PLAN 2
1529+
members[0].UserData = udEncode(1, 1, plan1["1"])
1530+
members = append(members, GroupMember{
1531+
ID: "2", Topics: []string{"foo", "bar"},
1532+
})
1533+
1534+
plan2 := Balance(members, topics)
1535+
testEqualDivvy(t, plan2, 16, members)
1536+
testPlanUsage(t, plan2, topics, nil)
1537+
1538+
if len(plan2["1"]["foo"]) != 8 || len(plan2["1"]["bar"]) != 8 ||
1539+
len(plan2["2"]["foo"]) != 8 || len(plan2["2"]["bar"]) != 8 {
1540+
t.Errorf("bad distribution: %v", plan2)
1541+
}
1542+
}
1543+
15201544
func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) {
15211545
t.Parallel()
15221546

0 commit comments

Comments
 (0)
Please sign in to comment.