Skip to content

Commit 36b4437

Browse files
committedOct 31, 2023
sticky: further improvements
* Introduces separate functions for go 1.21+, allowing to eliminate unremoveable allocs from sort.Sort. To keep it simple, this simplifies <=1.20 a little bit, so that is **slightly** more inefficient. * Improves new-partition assignment further -- ensure we always place unassigned partitions on the least consuming member.
1 parent 6ebcb43 commit 36b4437

File tree

4 files changed

+100
-32
lines changed

4 files changed

+100
-32
lines changed
 

‎pkg/kgo/internal/sticky/go121.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
//go:build go1.21
2+
// +build go1.21
3+
4+
package sticky
5+
6+
import "slices"
7+
8+
func sortPartNums(ps memberPartitions) {
9+
slices.Sort(ps)
10+
}
11+
12+
func (b *balancer) sortMemberByLiteralPartNum(memberNum int) {
13+
partNums := b.plan[memberNum]
14+
slices.SortFunc(partNums, func(lpNum, rpNum int32) int {
15+
ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum]
16+
li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum]
17+
lt, rt := li.topic, ri.topic
18+
lp, rp := lpNum-li.partNum, rpNum-ri.partNum
19+
if lp < rp {
20+
return -1
21+
} else if lp > rp {
22+
return 1
23+
} else if lt < rt {
24+
return -1
25+
}
26+
return 1
27+
})
28+
}

‎pkg/kgo/internal/sticky/goold.go

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
//go:build !go1.21
2+
// +build !go1.21
3+
4+
package sticky
5+
6+
import "sort"
7+
8+
func sortPartNums(partNums memberPartitions) {
9+
sort.Slice(partNums, func(i, j int) bool { return partNums[i] < partNums[j] })
10+
}
11+
12+
func (b *balancer) sortMemberByLiteralPartNum(memberNum int) {
13+
partNums := b.plan[memberNum]
14+
sort.Slice(partNums, func(i, j int) bool {
15+
lpNum, rpNum := partNums[i], partNums[j]
16+
ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum]
17+
li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum]
18+
lt, rt := li.topic, ri.topic
19+
lp, rp := lpNum-li.partNum, rpNum-ri.partNum
20+
return lp < rp || (lp == rp && lt < rt)
21+
})
22+
}

‎pkg/kgo/internal/sticky/sticky.go

+36-32
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package sticky
77

88
import (
99
"math"
10-
"sort"
1110

1211
"github.com/twmb/franz-go/pkg/kbin"
1312
"github.com/twmb/franz-go/pkg/kmsg"
@@ -138,7 +137,7 @@ func (b *balancer) into() Plan {
138137
// partOwners is created by topic, and partNums refers to
139138
// indices in partOwners. If we sort by partNum, we have sorted
140139
// topics and partitions.
141-
sort.Sort(&partNums) //nolint:gosec // sorting the slice, not using the pointer across iter
140+
sortPartNums(partNums)
142141

143142
// We can reuse partNums for our topic partitions.
144143
topicParts := partNums[:0]
@@ -203,10 +202,6 @@ func (m *memberPartitions) add(partNum int32) {
203202
*m = append(*m, partNum)
204203
}
205204

206-
func (m *memberPartitions) Len() int { return len(*m) }
207-
func (m *memberPartitions) Less(i, j int) bool { return (*m)[i] < (*m)[j] }
208-
func (m *memberPartitions) Swap(i, j int) { (*m)[i], (*m)[j] = (*m)[j], (*m)[i] }
209-
210205
// membersPartitions maps members to their partitions.
211206
type membersPartitions []memberPartitions
212207

@@ -479,33 +474,42 @@ func (b *balancer) assignUnassignedAndInitGraph() {
479474
// all topics. This benefits the standard case the most, where all
480475
// members consume equally.
481476
for memberNum := range b.plan {
482-
partNums := b.plan[memberNum]
483-
sort.Slice(partNums, func(i, j int) bool {
484-
lpNum, rpNum := partNums[i], partNums[j]
485-
ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum]
486-
li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum]
487-
lt, rt := li.topic, ri.topic
488-
lp, rp := lpNum-li.partNum, rpNum-ri.partNum
489-
return lp < rp || (lp == rp && lt < rt)
490-
})
491-
}
492-
493-
for _, potentials := range topicPotentials {
494-
(&membersByPartitions{potentials, b.plan}).init()
477+
b.sortMemberByLiteralPartNum(memberNum)
495478
}
496479

497-
for partNum, owner := range partitionConsumers {
498-
if owner.memberNum != unassignedPart {
499-
continue
480+
if !b.isComplex && len(topicPotentials) > 0 {
481+
potentials := topicPotentials[0]
482+
(&membersByPartitions{potentials, b.plan}).init()
483+
for partNum, owner := range partitionConsumers {
484+
if owner.memberNum != unassignedPart {
485+
continue
486+
}
487+
assigned := potentials[0]
488+
b.plan[assigned].add(int32(partNum))
489+
(&membersByPartitions{potentials, b.plan}).fix0()
490+
partitionConsumers[partNum].memberNum = assigned
500491
}
501-
potentials := topicPotentials[b.partOwners[partNum]]
502-
if len(potentials) == 0 {
503-
continue
492+
} else {
493+
for partNum, owner := range partitionConsumers {
494+
if owner.memberNum != unassignedPart {
495+
continue
496+
}
497+
potentials := topicPotentials[b.partOwners[partNum]]
498+
if len(potentials) == 0 {
499+
continue
500+
}
501+
leastConsumingPotential := potentials[0]
502+
leastConsuming := len(b.plan[leastConsumingPotential])
503+
for _, potential := range potentials[1:] {
504+
potentialConsuming := len(b.plan[potential])
505+
if potentialConsuming < leastConsuming {
506+
leastConsumingPotential = potential
507+
leastConsuming = potentialConsuming
508+
}
509+
}
510+
b.plan[leastConsumingPotential].add(int32(partNum))
511+
partitionConsumers[partNum].memberNum = leastConsumingPotential
504512
}
505-
assigned := potentials[0]
506-
b.plan[assigned].add(int32(partNum))
507-
(&membersByPartitions{potentials, b.plan}).fix0()
508-
partitionConsumers[partNum].memberNum = assigned
509513
}
510514

511515
// Lastly, with everything assigned, we build our steal graph for
@@ -553,7 +557,7 @@ func (b *balancer) tryRestickyStales(
553557
currentOwner := partitionConsumers[staleNum].memberNum
554558
lastOwnerPartitions := &b.plan[lastOwnerNum]
555559
currentOwnerPartitions := &b.plan[currentOwner]
556-
if lastOwnerPartitions.Len()+1 < currentOwnerPartitions.Len() {
560+
if len(*lastOwnerPartitions)+1 < len(*currentOwnerPartitions) {
557561
currentOwnerPartitions.remove(staleNum)
558562
lastOwnerPartitions.add(staleNum)
559563
}
@@ -704,8 +708,8 @@ func (b *balancer) reassignPartition(src, dst uint16, partNum int32) {
704708
srcPartitions := &b.plan[src]
705709
dstPartitions := &b.plan[dst]
706710

707-
oldSrcLevel := srcPartitions.Len()
708-
oldDstLevel := dstPartitions.Len()
711+
oldSrcLevel := len(*srcPartitions)
712+
oldDstLevel := len(*dstPartitions)
709713

710714
srcPartitions.remove(partNum)
711715
dstPartitions.add(partNum)

‎pkg/kgo/internal/sticky/sticky_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -1541,6 +1541,20 @@ func Test_stickyAddEqualMove(t *testing.T) {
15411541
}
15421542
}
15431543

1544+
func Test_stickyTwoJoinEqualBalance(t *testing.T) {
1545+
t.Parallel()
1546+
topics := map[string]int32{"foo": 16, "bar": 16}
1547+
members := []GroupMember{
1548+
{ID: "1", Topics: []string{"foo", "bar"}},
1549+
{ID: "2", Topics: []string{"foo", "bar"}},
1550+
}
1551+
plan := Balance(members, topics)
1552+
if len(plan["1"]["foo"]) != 8 || len(plan["1"]["bar"]) != 8 ||
1553+
len(plan["2"]["foo"]) != 8 || len(plan["2"]["bar"]) != 8 {
1554+
t.Errorf("bad distribution: %v", plan)
1555+
}
1556+
}
1557+
15441558
func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) {
15451559
t.Parallel()
15461560

0 commit comments

Comments
 (0)
Please sign in to comment.