@@ -7,7 +7,6 @@ package sticky
7
7
8
8
import (
9
9
"math"
10
- "sort"
11
10
12
11
"github.com/twmb/franz-go/pkg/kbin"
13
12
"github.com/twmb/franz-go/pkg/kmsg"
@@ -138,7 +137,7 @@ func (b *balancer) into() Plan {
138
137
// partOwners is created by topic, and partNums refers to
139
138
// indices in partOwners. If we sort by partNum, we have sorted
140
139
// topics and partitions.
141
- sort . Sort ( & partNums ) //nolint:gosec // sorting the slice, not using the pointer across iter
140
+ sortPartNums ( partNums )
142
141
143
142
// We can reuse partNums for our topic partitions.
144
143
topicParts := partNums [:0 ]
@@ -203,10 +202,6 @@ func (m *memberPartitions) add(partNum int32) {
203
202
* m = append (* m , partNum )
204
203
}
205
204
206
- func (m * memberPartitions ) Len () int { return len (* m ) }
207
- func (m * memberPartitions ) Less (i , j int ) bool { return (* m )[i ] < (* m )[j ] }
208
- func (m * memberPartitions ) Swap (i , j int ) { (* m )[i ], (* m )[j ] = (* m )[j ], (* m )[i ] }
209
-
210
205
// membersPartitions maps members to their partitions.
211
206
type membersPartitions []memberPartitions
212
207
@@ -479,33 +474,42 @@ func (b *balancer) assignUnassignedAndInitGraph() {
479
474
// all topics. This benefits the standard case the most, where all
480
475
// members consume equally.
481
476
for memberNum := range b .plan {
482
- partNums := b .plan [memberNum ]
483
- sort .Slice (partNums , func (i , j int ) bool {
484
- lpNum , rpNum := partNums [i ], partNums [j ]
485
- ltNum , rtNum := b .partOwners [lpNum ], b .partOwners [rpNum ]
486
- li , ri := b .topicInfos [ltNum ], b .topicInfos [rtNum ]
487
- lt , rt := li .topic , ri .topic
488
- lp , rp := lpNum - li .partNum , rpNum - ri .partNum
489
- return lp < rp || (lp == rp && lt < rt )
490
- })
491
- }
492
-
493
- for _ , potentials := range topicPotentials {
494
- (& membersByPartitions {potentials , b .plan }).init ()
477
+ b .sortMemberByLiteralPartNum (memberNum )
495
478
}
496
479
497
- for partNum , owner := range partitionConsumers {
498
- if owner .memberNum != unassignedPart {
499
- continue
480
+ if ! b .isComplex && len (topicPotentials ) > 0 {
481
+ potentials := topicPotentials [0 ]
482
+ (& membersByPartitions {potentials , b .plan }).init ()
483
+ for partNum , owner := range partitionConsumers {
484
+ if owner .memberNum != unassignedPart {
485
+ continue
486
+ }
487
+ assigned := potentials [0 ]
488
+ b .plan [assigned ].add (int32 (partNum ))
489
+ (& membersByPartitions {potentials , b .plan }).fix0 ()
490
+ partitionConsumers [partNum ].memberNum = assigned
500
491
}
501
- potentials := topicPotentials [b.partOwners [partNum ]]
502
- if len (potentials ) == 0 {
503
- continue
492
+ } else {
493
+ for partNum , owner := range partitionConsumers {
494
+ if owner .memberNum != unassignedPart {
495
+ continue
496
+ }
497
+ potentials := topicPotentials [b.partOwners [partNum ]]
498
+ if len (potentials ) == 0 {
499
+ continue
500
+ }
501
+ leastConsumingPotential := potentials [0 ]
502
+ leastConsuming := len (b .plan [leastConsumingPotential ])
503
+ for _ , potential := range potentials [1 :] {
504
+ potentialConsuming := len (b .plan [potential ])
505
+ if potentialConsuming < leastConsuming {
506
+ leastConsumingPotential = potential
507
+ leastConsuming = potentialConsuming
508
+ }
509
+ }
510
+ b .plan [leastConsumingPotential ].add (int32 (partNum ))
511
+ partitionConsumers [partNum ].memberNum = leastConsumingPotential
504
512
}
505
- assigned := potentials [0 ]
506
- b .plan [assigned ].add (int32 (partNum ))
507
- (& membersByPartitions {potentials , b .plan }).fix0 ()
508
- partitionConsumers [partNum ].memberNum = assigned
509
513
}
510
514
511
515
// Lastly, with everything assigned, we build our steal graph for
@@ -553,7 +557,7 @@ func (b *balancer) tryRestickyStales(
553
557
currentOwner := partitionConsumers [staleNum ].memberNum
554
558
lastOwnerPartitions := & b .plan [lastOwnerNum ]
555
559
currentOwnerPartitions := & b .plan [currentOwner ]
556
- if lastOwnerPartitions . Len ( )+ 1 < currentOwnerPartitions . Len ( ) {
560
+ if len ( * lastOwnerPartitions )+ 1 < len ( * currentOwnerPartitions ) {
557
561
currentOwnerPartitions .remove (staleNum )
558
562
lastOwnerPartitions .add (staleNum )
559
563
}
@@ -704,8 +708,8 @@ func (b *balancer) reassignPartition(src, dst uint16, partNum int32) {
704
708
srcPartitions := & b .plan [src ]
705
709
dstPartitions := & b .plan [dst ]
706
710
707
- oldSrcLevel := srcPartitions . Len ( )
708
- oldDstLevel := dstPartitions . Len ( )
711
+ oldSrcLevel := len ( * srcPartitions )
712
+ oldDstLevel := len ( * dstPartitions )
709
713
710
714
srcPartitions .remove (partNum )
711
715
dstPartitions .add (partNum )
0 commit comments