Skip to content

Commit 1827add

Browse files
committedJul 20, 2024·
kgo sink: fix read/write race for recBatch.canFailFromLoadErrs
When writing a record batch during a request, the batch mutex is locked. This guards against a concurrent failAllRecords, which can be triggered from a metadata update. However, a boolean field that guarded against failing buffered records if it's not "safe" was not properly mutex guarded. Writing a request only locks the batch, not the owning recBuf, while checking to see if the batch could fail only locked the owning recBuf, not the batch. This adds locking around the batch when checking if it can be failed, and adds a bool that, if true (due to load failures), ensures the batch is not written. Closes #785.
1 parent a5f2b71 commit 1827add

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed
 

‎pkg/kgo/sink.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,8 @@ func (s *sink) handleReqRespBatch(
714714

715715
// Since we have received a response and we are the first batch, we can
716716
// at this point re-enable failing from load errors.
717+
//
718+
// We do not need a lock since the owner is locked.
717719
batch.canFailFromLoadErrs = true
718720

719721
// By default, we assume we errored. Non-error updates this back
@@ -1294,6 +1296,10 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
12941296
batch0 := recBuf.batches[0]
12951297
batch0.tries++
12961298

1299+
// We need to lock the batch as well because there could be a buffered
1300+
// request about to be written. Writing requests only grabs the batch
1301+
// mu, not the recBuf mu.
1302+
batch0.mu.Lock()
12971303
var (
12981304
canFail = !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs // we can only fail if we are not idempotent or if we have no outstanding requests
12991305
batch0Fail = batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting
@@ -1303,6 +1309,8 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
13031309

13041310
willFail = canFail && (batch0Fail || !netErr && (!retryableKerr || retryableKerr && isUnknownLimit))
13051311
)
1312+
batch0.isFailingFromLoadErr = willFail
1313+
batch0.mu.Unlock()
13061314

13071315
recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch",
13081316
"broker", logID(recBuf.sink.nodeID),
@@ -1316,6 +1324,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
13161324
"is_unknown_limit", isUnknownLimit,
13171325
"will_fail", willFail,
13181326
)
1327+
13191328
if willFail {
13201329
recBuf.failAllRecords(err)
13211330
}
@@ -1406,6 +1415,10 @@ type recBatch struct {
14061415
// request with this batch, and then reset it to true whenever we
14071416
// process a response.
14081417
canFailFromLoadErrs bool
1418+
// If we are going to fail the batch in bumpRepeatedLoadErr, we need to
1419+
// set this bool to true. There could be a concurrent request about to
1420+
// be written. See more comments below where this is used.
1421+
isFailingFromLoadErr bool
14091422

14101423
wireLength int32 // tracks total size this batch would currently encode as, including length prefix
14111424
v1wireLength int32 // same as wireLength, but for message set v1
@@ -1958,7 +1971,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte {
19581971
for partition, batch := range partitions {
19591972
dst = kbin.AppendInt32(dst, partition)
19601973
batch.mu.Lock()
1961-
if batch.records == nil { // concurrent failAllRecords
1974+
if batch.records == nil || batch.isFailingFromLoadErr { // concurrent failAllRecords OR concurrent bumpRepeatedLoadErr
19621975
if flexible {
19631976
dst = kbin.AppendCompactNullableBytes(dst, nil)
19641977
} else {

0 commit comments

Comments
 (0)
Please sign in to comment.