Skip to content

Commit 99d6dfb

Browse files
committedFeb 7, 2024
kgo: further fix for cd65d77
The prior commit was insufficient -- we left a dangling lock, and we had that same dangling lock in the cancelable commit offsets.
1 parent 41f0269 commit 99d6dfb

File tree

1 file changed

+6
-9
lines changed

1 file changed

+6
-9
lines changed
 

‎pkg/kgo/consumer_group.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -2580,13 +2580,13 @@ func (g *groupConsumer) commitOffsetsSync(
25802580
onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {}
25812581
}
25822582

2583-
g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done.
2584-
25852583
if err := g.waitJoinSyncMu(ctx); err != nil {
25862584
onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err)
25872585
close(done)
25882586
return
25892587
}
2588+
2589+
g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done.
25902590
unblockCommits := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
25912591
g.noCommitDuringJoinAndSync.RUnlock()
25922592
defer close(done)
@@ -2663,19 +2663,16 @@ func (cl *Client) CommitOffsets(
26632663
return
26642664
}
26652665

2666-
g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us
2667-
unblockSyncCommit := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
2668-
defer g.syncCommitMu.RUnlock()
2669-
onDone(cl, req, resp, err)
2670-
}
2671-
26722666
if err := g.waitJoinSyncMu(ctx); err != nil {
26732667
onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err)
26742668
return
26752669
}
2670+
2671+
g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us
26762672
unblockJoinSync := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
26772673
g.noCommitDuringJoinAndSync.RUnlock()
2678-
unblockSyncCommit(cl, req, resp, err)
2674+
defer g.syncCommitMu.RUnlock()
2675+
onDone(cl, req, resp, err)
26792676
}
26802677

26812678
g.mu.Lock()

0 commit comments

Comments
 (0)
Please sign in to comment.