@@ -458,7 +458,7 @@ func (g *groupConsumer) leave() (wait func()) {
458
458
// returns the difference of g.nowAssigned and g.lastAssigned.
459
459
func (g * groupConsumer ) diffAssigned () (added , lost map [string ][]int32 ) {
460
460
nowAssigned := g .nowAssigned .clone ()
461
- if g . lastAssigned == nil {
461
+ if ! g . cooperative {
462
462
return nowAssigned , nil
463
463
}
464
464
@@ -749,6 +749,8 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) {
749
749
750
750
s := newAssignRevokeSession ()
751
751
added , lost := g .diffAssigned ()
752
+ g .lastAssigned = g .nowAssigned .clone () // now that we are done with our last assignment, update it per the new assignment
753
+
752
754
g .cfg .logger .Log (LogLevelInfo , "new group session begun" , "group" , g .cfg .group , "added" , mtps (added ), "lost" , mtps (lost ))
753
755
s .prerevoke (g , lost ) // for cooperative consumers
754
756
@@ -1330,9 +1332,6 @@ func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResp
1330
1332
1331
1333
// Past this point, we will fall into the setupAssigned prerevoke code,
1332
1334
// meaning for cooperative, we will revoke what we need to.
1333
- if g .cooperative {
1334
- g .lastAssigned = g .nowAssigned .clone ()
1335
- }
1336
1335
g .nowAssigned .store (assigned )
1337
1336
return nil
1338
1337
}
@@ -1344,21 +1343,24 @@ func (g *groupConsumer) joinGroupProtocols() []kmsg.JoinGroupRequestProtocol {
1344
1343
for topic := range g .using {
1345
1344
topics = append (topics , topic )
1346
1345
}
1347
- nowDup := g .nowAssigned .clone () // deep copy to allow modifications
1346
+ lastDup := make (map [string ][]int32 , len (g .lastAssigned ))
1347
+ for t , ps := range g .lastAssigned {
1348
+ lastDup [t ] = append ([]int32 (nil ), ps ... ) // deep copy to allow modifications
1349
+ }
1348
1350
gen := g .generation
1349
1351
1350
1352
g .mu .Unlock ()
1351
1353
1352
1354
sort .Strings (topics ) // we guarantee to JoinGroupMetadata that the input strings are sorted
1353
- for _ , partitions := range nowDup {
1355
+ for _ , partitions := range lastDup {
1354
1356
sort .Slice (partitions , func (i , j int ) bool { return partitions [i ] < partitions [j ] }) // same for partitions
1355
1357
}
1356
1358
1357
1359
var protos []kmsg.JoinGroupRequestProtocol
1358
1360
for _ , balancer := range g .cfg .balancers {
1359
1361
proto := kmsg .NewJoinGroupRequestProtocol ()
1360
1362
proto .Name = balancer .ProtocolName ()
1361
- proto .Metadata = balancer .JoinGroupMetadata (topics , nowDup , gen )
1363
+ proto .Metadata = balancer .JoinGroupMetadata (topics , lastDup , gen )
1362
1364
protos = append (protos , proto )
1363
1365
}
1364
1366
return protos
0 commit comments