@@ -2580,13 +2580,13 @@ func (g *groupConsumer) commitOffsetsSync(
2580
2580
onDone = func (* Client , * kmsg.OffsetCommitRequest , * kmsg.OffsetCommitResponse , error ) {}
2581
2581
}
2582
2582
2583
- g .syncCommitMu .Lock () // block all other concurrent commits until our OnDone is done.
2584
-
2585
2583
if err := g .waitJoinSyncMu (ctx ); err != nil {
2586
2584
onDone (g .cl , kmsg .NewPtrOffsetCommitRequest (), kmsg .NewPtrOffsetCommitResponse (), err )
2587
2585
close (done )
2588
2586
return
2589
2587
}
2588
+
2589
+ g .syncCommitMu .Lock () // block all other concurrent commits until our OnDone is done.
2590
2590
unblockCommits := func (cl * Client , req * kmsg.OffsetCommitRequest , resp * kmsg.OffsetCommitResponse , err error ) {
2591
2591
g .noCommitDuringJoinAndSync .RUnlock ()
2592
2592
defer close (done )
@@ -2663,19 +2663,16 @@ func (cl *Client) CommitOffsets(
2663
2663
return
2664
2664
}
2665
2665
2666
- g .syncCommitMu .RLock () // block sync commit, but allow other concurrent Commit to cancel us
2667
- unblockSyncCommit := func (cl * Client , req * kmsg.OffsetCommitRequest , resp * kmsg.OffsetCommitResponse , err error ) {
2668
- defer g .syncCommitMu .RUnlock ()
2669
- onDone (cl , req , resp , err )
2670
- }
2671
-
2672
2666
if err := g .waitJoinSyncMu (ctx ); err != nil {
2673
2667
onDone (g .cl , kmsg .NewPtrOffsetCommitRequest (), kmsg .NewPtrOffsetCommitResponse (), err )
2674
2668
return
2675
2669
}
2670
+
2671
+ g .syncCommitMu .RLock () // block sync commit, but allow other concurrent Commit to cancel us
2676
2672
unblockJoinSync := func (cl * Client , req * kmsg.OffsetCommitRequest , resp * kmsg.OffsetCommitResponse , err error ) {
2677
2673
g .noCommitDuringJoinAndSync .RUnlock ()
2678
- unblockSyncCommit (cl , req , resp , err )
2674
+ defer g .syncCommitMu .RUnlock ()
2675
+ onDone (cl , req , resp , err )
2679
2676
}
2680
2677
2681
2678
g .mu .Lock ()
0 commit comments