Skip to content

Commit dcfcacb

Browse files
committedNov 15, 2022
{Cooperative,Sticky}Balancer: bug fix lack of stickiness
We clear nowAssigned at the end of a group session, meaning using it for our prior assignment is invalid and would always be empty. Thus, we would never preserve stickiness. Now, we use our lastAssigned for stickiness, we *always* set it, and we set it after using the current/prior values and before we clear nowAssigned.
1 parent 8105c36 commit dcfcacb

File tree

1 file changed

+9
-7
lines changed

1 file changed

+9
-7
lines changed
 

‎pkg/kgo/consumer_group.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ func (g *groupConsumer) leave() (wait func()) {
458458
// returns the difference of g.nowAssigned and g.lastAssigned.
459459
func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) {
460460
nowAssigned := g.nowAssigned.clone()
461-
if g.lastAssigned == nil {
461+
if !g.cooperative {
462462
return nowAssigned, nil
463463
}
464464

@@ -749,6 +749,8 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) {
749749

750750
s := newAssignRevokeSession()
751751
added, lost := g.diffAssigned()
752+
g.lastAssigned = g.nowAssigned.clone() // now that we are done with our last assignment, update it per the new assignment
753+
752754
g.cfg.logger.Log(LogLevelInfo, "new group session begun", "group", g.cfg.group, "added", mtps(added), "lost", mtps(lost))
753755
s.prerevoke(g, lost) // for cooperative consumers
754756

@@ -1330,9 +1332,6 @@ func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResp
13301332

13311333
// Past this point, we will fall into the setupAssigned prerevoke code,
13321334
// meaning for cooperative, we will revoke what we need to.
1333-
if g.cooperative {
1334-
g.lastAssigned = g.nowAssigned.clone()
1335-
}
13361335
g.nowAssigned.store(assigned)
13371336
return nil
13381337
}
@@ -1344,21 +1343,24 @@ func (g *groupConsumer) joinGroupProtocols() []kmsg.JoinGroupRequestProtocol {
13441343
for topic := range g.using {
13451344
topics = append(topics, topic)
13461345
}
1347-
nowDup := g.nowAssigned.clone() // deep copy to allow modifications
1346+
lastDup := make(map[string][]int32, len(g.lastAssigned))
1347+
for t, ps := range g.lastAssigned {
1348+
lastDup[t] = append([]int32(nil), ps...) // deep copy to allow modifications
1349+
}
13481350
gen := g.generation
13491351

13501352
g.mu.Unlock()
13511353

13521354
sort.Strings(topics) // we guarantee to JoinGroupMetadata that the input strings are sorted
1353-
for _, partitions := range nowDup {
1355+
for _, partitions := range lastDup {
13541356
sort.Slice(partitions, func(i, j int) bool { return partitions[i] < partitions[j] }) // same for partitions
13551357
}
13561358

13571359
var protos []kmsg.JoinGroupRequestProtocol
13581360
for _, balancer := range g.cfg.balancers {
13591361
proto := kmsg.NewJoinGroupRequestProtocol()
13601362
proto.Name = balancer.ProtocolName()
1361-
proto.Metadata = balancer.JoinGroupMetadata(topics, nowDup, gen)
1363+
proto.Metadata = balancer.JoinGroupMetadata(topics, lastDup, gen)
13621364
protos = append(protos, proto)
13631365
}
13641366
return protos

0 commit comments

Comments
 (0)
Please sign in to comment.