Skip to content

Commit e62b402

Browse files
committedJun 10, 2024
kgo sink: do not back off on certain edge case
* Produce request created and about to be issued * Metadata request resolves and removes the broker that was about to be sent to, updates leadership for the partition * recBuf's `sink` field is updated * The old sink then enters handleReqResp, then eventually handleRetryBatches Previously, * Failed partition triggers a metadata refresh and enters a failed state until the metadata refresh clears the failing state. Because a metadata refresh JUST happened, internally this causes a 5s wait by default Now, * Failed partition notices that it is actually NOW on a different broker than the broker that is handling the failure, and does not back off at all, and actually triggers potentially draining on the new sink once decInflight runs Closes #746.
1 parent 40589af commit e62b402

File tree

1 file changed

+20
-0
lines changed

1 file changed

+20
-0
lines changed
 

‎pkg/kgo/sink.go

+20
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,24 @@ func (s *sink) handleRetryBatches(
942942
return
943943
}
944944

945+
// If the request failed due to a concurrent metadata update
946+
// moving partitions to a different sink (or killing the sink
947+
// this partition was on), we can just reset the drain index
948+
// and trigger draining now the new sink. There is no reason
949+
// to backoff on this sink nor trigger a metadata update.
950+
if batch.owner.sink != s {
951+
if debug {
952+
logger.Log(LogLevelDebug, "transitioned sinks while a request was inflight, retrying immediately on new sink without backoff",
953+
"topic", batch.owner.topic,
954+
"partition", batch.owner.partition,
955+
"old_sink", s.nodeID,
956+
"new_sink", batch.owner.sink.nodeID,
957+
)
958+
}
959+
batch.owner.resetBatchDrainIdx()
960+
return
961+
}
962+
945963
if canFail || s.cl.cfg.disableIdempotency {
946964
if err := batch.maybeFailErr(&s.cl.cfg); err != nil {
947965
batch.owner.failAllRecords(err)
@@ -1003,6 +1021,8 @@ func (s *sink) handleRetryBatches(
10031021
// If neither of these cases are true, then we entered wanting a
10041022
// metadata update, but the batches either were not the first batch, or
10051023
// the batches were concurrently failed.
1024+
//
1025+
// If all partitions are moving, we do not need to backoff nor drain.
10061026
if shouldBackoff || (!updateMeta && numRetryBatches != numMoveBatches) {
10071027
s.maybeTriggerBackoff(backoffSeq)
10081028
s.maybeDrain()

0 commit comments

Comments
 (0)
Please sign in to comment.