Skip to content

Commit 54a7418

Browse files
iamnoahtwmb
authored andcommittedOct 21, 2023
kgo: allow PreTxnCommitFnContext to modify empty offsets
This builds the TxnOffsetCommitRequest early so that the hook can modify it. If the modified request has no topics to commit, then we abort as though uncommitted was empty.
1 parent 39e28c0 commit 54a7418

File tree

1 file changed

+53
-50
lines changed

1 file changed

+53
-50
lines changed
 

‎pkg/kgo/txn.go

+53-50
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import (
88
"sync"
99
"time"
1010

11-
"github.com/twmb/franz-go/pkg/kerr"
1211
"github.com/twmb/franz-go/pkg/kmsg"
12+
13+
"github.com/twmb/franz-go/pkg/kerr"
1314
)
1415

1516
// TransactionEndTry is simply a named bool.
@@ -1060,7 +1061,13 @@ func (cl *Client) commitTransactionOffsets(
10601061
onDone(kmsg.NewPtrTxnOffsetCommitRequest(), kmsg.NewPtrTxnOffsetCommitResponse(), errNotGroup)
10611062
return nil
10621063
}
1063-
if len(uncommitted) == 0 {
1064+
1065+
req, err := g.prepareTxnOffsetCommit(ctx, uncommitted)
1066+
if err != nil {
1067+
onDone(req, kmsg.NewPtrTxnOffsetCommitResponse(), err)
1068+
return g
1069+
}
1070+
if len(req.Topics) == 0 {
10641071
onDone(kmsg.NewPtrTxnOffsetCommitRequest(), kmsg.NewPtrTxnOffsetCommitResponse(), nil)
10651072
return g
10661073
}
@@ -1088,7 +1095,7 @@ func (cl *Client) commitTransactionOffsets(
10881095
g.mu.Lock()
10891096
defer g.mu.Unlock()
10901097

1091-
g.commitTxn(ctx, uncommitted, unblockJoinSync)
1098+
g.commitTxn(ctx, req, unblockJoinSync)
10921099
return g
10931100
}
10941101

@@ -1139,18 +1146,10 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error {
11391146
// commitTxn is ALMOST EXACTLY THE SAME as commit, but changed for txn types
11401147
// and we avoid updateCommitted. We avoid updating because we manually
11411148
// SetOffsets when ending the transaction.
1142-
func (g *groupConsumer) commitTxn(
1143-
ctx context.Context,
1144-
uncommitted map[string]map[int32]EpochOffset,
1145-
onDone func(*kmsg.TxnOffsetCommitRequest, *kmsg.TxnOffsetCommitResponse, error),
1146-
) {
1149+
func (g *groupConsumer) commitTxn(ctx context.Context, req *kmsg.TxnOffsetCommitRequest, onDone func(*kmsg.TxnOffsetCommitRequest, *kmsg.TxnOffsetCommitResponse, error)) {
11471150
if onDone == nil { // note we must always call onDone
11481151
onDone = func(_ *kmsg.TxnOffsetCommitRequest, _ *kmsg.TxnOffsetCommitResponse, _ error) {}
11491152
}
1150-
if len(uncommitted) == 0 { // only empty if called thru autocommit / default revoke
1151-
onDone(kmsg.NewPtrTxnOffsetCommitRequest(), kmsg.NewPtrTxnOffsetCommitResponse(), nil)
1152-
return
1153-
}
11541153

11551154
if g.commitCancel != nil {
11561155
g.commitCancel() // cancel any prior commit
@@ -1169,22 +1168,6 @@ func (g *groupConsumer) commitTxn(
11691168
g.commitCancel = commitCancel
11701169
g.commitDone = commitDone
11711170

1172-
// We issue this request even if the producer ID is failed; the request
1173-
// will fail if it is.
1174-
//
1175-
// The id must have been set at least once by this point because of
1176-
// addOffsetsToTxn.
1177-
id, epoch, _ := g.cl.producerID()
1178-
req := kmsg.NewPtrTxnOffsetCommitRequest()
1179-
req.TransactionalID = *g.cl.cfg.txnID
1180-
req.Group = g.cfg.group
1181-
req.ProducerID = id
1182-
req.ProducerEpoch = epoch
1183-
memberID, generation := g.memberGen.load()
1184-
req.Generation = generation
1185-
req.MemberID = memberID
1186-
req.InstanceID = g.cfg.instanceID
1187-
11881171
if ctx.Done() != nil {
11891172
go func() {
11901173
select {
@@ -1207,28 +1190,7 @@ func (g *groupConsumer) commitTxn(
12071190
<-priorDone // wait for any prior request to finish
12081191
}
12091192
}
1210-
g.cl.cfg.logger.Log(LogLevelDebug, "issuing txn offset commit", "uncommitted", uncommitted)
1211-
1212-
for topic, partitions := range uncommitted {
1213-
reqTopic := kmsg.NewTxnOffsetCommitRequestTopic()
1214-
reqTopic.Topic = topic
1215-
for partition, eo := range partitions {
1216-
reqPartition := kmsg.NewTxnOffsetCommitRequestTopicPartition()
1217-
reqPartition.Partition = partition
1218-
reqPartition.Offset = eo.Offset
1219-
reqPartition.LeaderEpoch = eo.Epoch
1220-
reqPartition.Metadata = &req.MemberID
1221-
reqTopic.Partitions = append(reqTopic.Partitions, reqPartition)
1222-
}
1223-
req.Topics = append(req.Topics, reqTopic)
1224-
}
1225-
1226-
if fn, ok := ctx.Value(txnCommitContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
1227-
if err := fn(req); err != nil {
1228-
onDone(req, nil, err)
1229-
return
1230-
}
1231-
}
1193+
g.cl.cfg.logger.Log(LogLevelDebug, "issuing txn offset commit", "uncommitted", req)
12321194

12331195
var resp *kmsg.TxnOffsetCommitResponse
12341196
var err error
@@ -1242,3 +1204,44 @@ func (g *groupConsumer) commitTxn(
12421204
onDone(req, resp, nil)
12431205
}()
12441206
}
1207+
1208+
func (g *groupConsumer) prepareTxnOffsetCommit(ctx context.Context, uncommitted map[string]map[int32]EpochOffset) (*kmsg.TxnOffsetCommitRequest, error) {
1209+
req := kmsg.NewPtrTxnOffsetCommitRequest()
1210+
1211+
// We're now generating the producerID before addOffsetsToTxn.
1212+
// We will not make this request until after addOffsetsToTxn, but it's possible to fail here due to a failed producerID.
1213+
id, epoch, err := g.cl.producerID()
1214+
if err != nil {
1215+
return req, err
1216+
}
1217+
1218+
req.TransactionalID = *g.cl.cfg.txnID
1219+
req.Group = g.cfg.group
1220+
req.ProducerID = id
1221+
req.ProducerEpoch = epoch
1222+
memberID, generation := g.memberGen.load()
1223+
req.Generation = generation
1224+
req.MemberID = memberID
1225+
req.InstanceID = g.cfg.instanceID
1226+
1227+
for topic, partitions := range uncommitted {
1228+
reqTopic := kmsg.NewTxnOffsetCommitRequestTopic()
1229+
reqTopic.Topic = topic
1230+
for partition, eo := range partitions {
1231+
reqPartition := kmsg.NewTxnOffsetCommitRequestTopicPartition()
1232+
reqPartition.Partition = partition
1233+
reqPartition.Offset = eo.Offset
1234+
reqPartition.LeaderEpoch = eo.Epoch
1235+
reqPartition.Metadata = &req.MemberID
1236+
reqTopic.Partitions = append(reqTopic.Partitions, reqPartition)
1237+
}
1238+
req.Topics = append(req.Topics, reqTopic)
1239+
}
1240+
1241+
if fn, ok := ctx.Value(txnCommitContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
1242+
if err := fn(req); err != nil {
1243+
return req, err
1244+
}
1245+
}
1246+
return req, nil
1247+
}

0 commit comments

Comments
 (0)
Please sign in to comment.