8
8
"sync"
9
9
"time"
10
10
11
- "github.com/twmb/franz-go/pkg/kerr"
12
11
"github.com/twmb/franz-go/pkg/kmsg"
12
+
13
+ "github.com/twmb/franz-go/pkg/kerr"
13
14
)
14
15
15
16
// TransactionEndTry is simply a named bool.
@@ -1060,7 +1061,13 @@ func (cl *Client) commitTransactionOffsets(
1060
1061
onDone (kmsg .NewPtrTxnOffsetCommitRequest (), kmsg .NewPtrTxnOffsetCommitResponse (), errNotGroup )
1061
1062
return nil
1062
1063
}
1063
- if len (uncommitted ) == 0 {
1064
+
1065
+ req , err := g .prepareTxnOffsetCommit (ctx , uncommitted )
1066
+ if err != nil {
1067
+ onDone (req , kmsg .NewPtrTxnOffsetCommitResponse (), err )
1068
+ return g
1069
+ }
1070
+ if len (req .Topics ) == 0 {
1064
1071
onDone (kmsg .NewPtrTxnOffsetCommitRequest (), kmsg .NewPtrTxnOffsetCommitResponse (), nil )
1065
1072
return g
1066
1073
}
@@ -1088,7 +1095,7 @@ func (cl *Client) commitTransactionOffsets(
1088
1095
g .mu .Lock ()
1089
1096
defer g .mu .Unlock ()
1090
1097
1091
- g .commitTxn (ctx , uncommitted , unblockJoinSync )
1098
+ g .commitTxn (ctx , req , unblockJoinSync )
1092
1099
return g
1093
1100
}
1094
1101
@@ -1139,18 +1146,10 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error {
1139
1146
// commitTxn is ALMOST EXACTLY THE SAME as commit, but changed for txn types
1140
1147
// and we avoid updateCommitted. We avoid updating because we manually
1141
1148
// SetOffsets when ending the transaction.
1142
- func (g * groupConsumer ) commitTxn (
1143
- ctx context.Context ,
1144
- uncommitted map [string ]map [int32 ]EpochOffset ,
1145
- onDone func (* kmsg.TxnOffsetCommitRequest , * kmsg.TxnOffsetCommitResponse , error ),
1146
- ) {
1149
+ func (g * groupConsumer ) commitTxn (ctx context.Context , req * kmsg.TxnOffsetCommitRequest , onDone func (* kmsg.TxnOffsetCommitRequest , * kmsg.TxnOffsetCommitResponse , error )) {
1147
1150
if onDone == nil { // note we must always call onDone
1148
1151
onDone = func (_ * kmsg.TxnOffsetCommitRequest , _ * kmsg.TxnOffsetCommitResponse , _ error ) {}
1149
1152
}
1150
- if len (uncommitted ) == 0 { // only empty if called thru autocommit / default revoke
1151
- onDone (kmsg .NewPtrTxnOffsetCommitRequest (), kmsg .NewPtrTxnOffsetCommitResponse (), nil )
1152
- return
1153
- }
1154
1153
1155
1154
if g .commitCancel != nil {
1156
1155
g .commitCancel () // cancel any prior commit
@@ -1169,22 +1168,6 @@ func (g *groupConsumer) commitTxn(
1169
1168
g .commitCancel = commitCancel
1170
1169
g .commitDone = commitDone
1171
1170
1172
- // We issue this request even if the producer ID is failed; the request
1173
- // will fail if it is.
1174
- //
1175
- // The id must have been set at least once by this point because of
1176
- // addOffsetsToTxn.
1177
- id , epoch , _ := g .cl .producerID ()
1178
- req := kmsg .NewPtrTxnOffsetCommitRequest ()
1179
- req .TransactionalID = * g .cl .cfg .txnID
1180
- req .Group = g .cfg .group
1181
- req .ProducerID = id
1182
- req .ProducerEpoch = epoch
1183
- memberID , generation := g .memberGen .load ()
1184
- req .Generation = generation
1185
- req .MemberID = memberID
1186
- req .InstanceID = g .cfg .instanceID
1187
-
1188
1171
if ctx .Done () != nil {
1189
1172
go func () {
1190
1173
select {
@@ -1207,28 +1190,7 @@ func (g *groupConsumer) commitTxn(
1207
1190
<- priorDone // wait for any prior request to finish
1208
1191
}
1209
1192
}
1210
- g .cl .cfg .logger .Log (LogLevelDebug , "issuing txn offset commit" , "uncommitted" , uncommitted )
1211
-
1212
- for topic , partitions := range uncommitted {
1213
- reqTopic := kmsg .NewTxnOffsetCommitRequestTopic ()
1214
- reqTopic .Topic = topic
1215
- for partition , eo := range partitions {
1216
- reqPartition := kmsg .NewTxnOffsetCommitRequestTopicPartition ()
1217
- reqPartition .Partition = partition
1218
- reqPartition .Offset = eo .Offset
1219
- reqPartition .LeaderEpoch = eo .Epoch
1220
- reqPartition .Metadata = & req .MemberID
1221
- reqTopic .Partitions = append (reqTopic .Partitions , reqPartition )
1222
- }
1223
- req .Topics = append (req .Topics , reqTopic )
1224
- }
1225
-
1226
- if fn , ok := ctx .Value (txnCommitContextFn ).(func (* kmsg.TxnOffsetCommitRequest ) error ); ok {
1227
- if err := fn (req ); err != nil {
1228
- onDone (req , nil , err )
1229
- return
1230
- }
1231
- }
1193
+ g .cl .cfg .logger .Log (LogLevelDebug , "issuing txn offset commit" , "uncommitted" , req )
1232
1194
1233
1195
var resp * kmsg.TxnOffsetCommitResponse
1234
1196
var err error
@@ -1242,3 +1204,44 @@ func (g *groupConsumer) commitTxn(
1242
1204
onDone (req , resp , nil )
1243
1205
}()
1244
1206
}
1207
+
1208
+ func (g * groupConsumer ) prepareTxnOffsetCommit (ctx context.Context , uncommitted map [string ]map [int32 ]EpochOffset ) (* kmsg.TxnOffsetCommitRequest , error ) {
1209
+ req := kmsg .NewPtrTxnOffsetCommitRequest ()
1210
+
1211
+ // We're now generating the producerID before addOffsetsToTxn.
1212
+ // We will not make this request until after addOffsetsToTxn, but it's possible to fail here due to a failed producerID.
1213
+ id , epoch , err := g .cl .producerID ()
1214
+ if err != nil {
1215
+ return req , err
1216
+ }
1217
+
1218
+ req .TransactionalID = * g .cl .cfg .txnID
1219
+ req .Group = g .cfg .group
1220
+ req .ProducerID = id
1221
+ req .ProducerEpoch = epoch
1222
+ memberID , generation := g .memberGen .load ()
1223
+ req .Generation = generation
1224
+ req .MemberID = memberID
1225
+ req .InstanceID = g .cfg .instanceID
1226
+
1227
+ for topic , partitions := range uncommitted {
1228
+ reqTopic := kmsg .NewTxnOffsetCommitRequestTopic ()
1229
+ reqTopic .Topic = topic
1230
+ for partition , eo := range partitions {
1231
+ reqPartition := kmsg .NewTxnOffsetCommitRequestTopicPartition ()
1232
+ reqPartition .Partition = partition
1233
+ reqPartition .Offset = eo .Offset
1234
+ reqPartition .LeaderEpoch = eo .Epoch
1235
+ reqPartition .Metadata = & req .MemberID
1236
+ reqTopic .Partitions = append (reqTopic .Partitions , reqPartition )
1237
+ }
1238
+ req .Topics = append (req .Topics , reqTopic )
1239
+ }
1240
+
1241
+ if fn , ok := ctx .Value (txnCommitContextFn ).(func (* kmsg.TxnOffsetCommitRequest ) error ); ok {
1242
+ if err := fn (req ); err != nil {
1243
+ return req , err
1244
+ }
1245
+ }
1246
+ return req , nil
1247
+ }
0 commit comments