@@ -478,8 +478,7 @@ func (cl *Client) waitTries(ctx context.Context, backoff time.Duration) bool {
478
478
// case, such as when a person explicitly assigns offsets with epochs, but we
479
479
// catch a few areas that would be returned from a broker itself.
480
480
//
481
- // This function is always used *after* at least one request has been issued,
482
- // so we do not check ensurePinged.
481
+ // This function is always used *after* at least one request has been issued.
483
482
//
484
483
// NOTE: This is a weak check; we check if any broker in the cluster supports
485
484
// the request. We use this function in three locations:
@@ -503,7 +502,6 @@ func (cl *Client) supportsOffsetForLeaderEpoch() bool {
503
502
504
503
// A broker may not support some requests we want to make. This function checks
505
504
// support. This should only be used *after* at least one successful response.
506
- // To absolutely ensure a response has been received, use ensurePinged.
507
505
func (cl * Client ) supportsKeyVersion (key , version int16 ) bool {
508
506
cl .brokersMu .RLock ()
509
507
defer cl .brokersMu .RUnlock ()
@@ -742,6 +740,7 @@ func (cl *Client) Close() {
742
740
//
743
741
// ListOffsets
744
742
// OffsetFetch (if using v8+ for Kafka 3.0+)
743
+ // FindCoordinator (if using v4+ for Kafka 3.0+)
745
744
// DescribeGroups
746
745
// ListGroups
747
746
// DeleteRecords
@@ -757,10 +756,9 @@ func (cl *Client) Close() {
757
756
// ListTransactions
758
757
//
759
758
// Kafka 3.0 introduced batch OffsetFetch and batch FindCoordinator requests.
760
- // This function is forward-compatible for the old, singular OffsetFetch and
761
- // FindCoordinator requests, but is not backward-compatible for batched
762
- // requests. It is recommended to only use the old format unless you know you
763
- // are speaking to Kafka 3.0+.
759
+ // This function is forward and backward compatible: old requests will be
760
+ // batched as necessary, and batched requests will be split as necessary. It is
761
+ // recommended to always use batch requests for simplicity.
764
762
//
765
763
// In short, this method tries to do the correct thing depending on what type
766
764
// of request is being issued.
@@ -963,6 +961,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
963
961
switch t := req .(type ) {
964
962
case * kmsg.ListOffsetsRequest , // key 2
965
963
* kmsg.OffsetFetchRequest , // key 9
964
+ * kmsg.FindCoordinatorRequest , // key 10
966
965
* kmsg.DescribeGroupsRequest , // key 15
967
966
* kmsg.ListGroupsRequest , // key 16
968
967
* kmsg.DeleteRecordsRequest , // key 21
@@ -978,24 +977,19 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
978
977
* kmsg.ListTransactionsRequest : // key 66
979
978
return cl .handleShardedReq (ctx , req )
980
979
981
- // We support being forward-compatible with FindCoordinator, so we need
982
- // to use our special hijack function that batches a singular key.
983
- case * kmsg.FindCoordinatorRequest :
984
- last , resp , err := cl .findCoordinator (ctx , t )
985
- return shards (shard (last , req , resp , err )), nil
986
- }
987
-
988
- switch t := req .(type ) {
989
980
case * kmsg.MetadataRequest :
990
981
// We hijack any metadata request so as to populate our
991
982
// own brokers and controller ID.
992
983
br , resp , err := cl .fetchMetadata (ctx , t , false )
993
984
return shards (shard (br , req , resp , err )), nil
985
+
994
986
case kmsg.AdminRequest :
995
987
return shards (cl .handleAdminReq (ctx , t )), nil
988
+
996
989
case kmsg.GroupCoordinatorRequest ,
997
990
kmsg.TxnCoordinatorRequest :
998
991
return shards (cl .handleCoordinatorReq (ctx , t )), nil
992
+
999
993
case * kmsg.ApiVersionsRequest :
1000
994
// As of v3, software name and version are required.
1001
995
// If they are missing, we use the config options.
@@ -1121,108 +1115,172 @@ type coordinatorKey struct {
1121
1115
}
1122
1116
1123
1117
type coordinatorLoad struct {
1124
- done chan struct {}
1125
- node int32
1126
- err error
1118
+ loadWait chan struct {}
1119
+ node int32
1120
+ err error
1121
+ }
1122
+
1123
+ func (cl * Client ) loadCoordinator (ctx context.Context , typ int8 , key string ) (* broker , error ) {
1124
+ berr := cl .loadCoordinators (ctx , typ , key )[key ]
1125
+ return berr .b , berr .err
1127
1126
}
1128
1127
1129
- // findCoordinator is allows FindCoordinator request to be forward compatible,
1130
- // by duplicating a top level request into a single-element batch request, and
1131
- // downconverting the response.
1132
- func (cl * Client ) findCoordinator (ctx context.Context , req * kmsg.FindCoordinatorRequest ) (* broker , * kmsg.FindCoordinatorResponse , error ) {
1133
- var compat bool
1134
- if len (req .CoordinatorKeys ) == 0 {
1135
- req .CoordinatorKeys = []string {req .CoordinatorKey }
1136
- compat = true
1128
+ func (cl * Client ) loadCoordinators (ctx context.Context , typ int8 , keys ... string ) map [string ]brokerOrErr {
1129
+ m := make (map [string ]brokerOrErr , len (keys ))
1130
+ if len (keys ) == 0 {
1131
+ return m
1137
1132
}
1138
- r := cl .retriable ()
1139
- resp , err := req .RequestWith (ctx , r )
1140
- if resp != nil {
1141
- if compat && resp .Version >= 4 {
1142
- if l := len (resp .Coordinators ); l != 1 {
1143
- return r .last , resp , fmt .Errorf ("unexpectedly received %d coordinators when requesting 1" , l )
1144
- }
1145
1133
1146
- first := resp .Coordinators [0 ]
1147
- resp .ErrorCode = first .ErrorCode
1148
- resp .ErrorMessage = first .ErrorMessage
1149
- resp .NodeID = first .NodeID
1150
- resp .Host = first .Host
1151
- resp .Port = first .Port
1134
+ toRequest := make (map [string ]bool , len (keys )) // true == bypass the cache
1135
+ for _ , key := range keys {
1136
+ if len (key ) > 0 {
1137
+ toRequest [key ] = false
1152
1138
}
1153
1139
}
1154
- return r .last , resp , err
1155
- }
1156
1140
1157
- func (cl * Client ) deleteStaleCoordinatorIfEqual (key coordinatorKey , current * coordinatorLoad ) {
1158
- cl .coordinatorsMu .Lock ()
1159
- defer cl .coordinatorsMu .Unlock ()
1160
- if existing , ok := cl .coordinators [key ]; ok && current == existing {
1161
- delete (cl .coordinators , key )
1162
- }
1163
- }
1141
+ // For each of these keys, we have two cases:
1142
+ //
1143
+ // 1) The key is cached. It is either loading or loaded. We do not
1144
+ // request the key ourselves; we wait for the load to finish.
1145
+ //
1146
+ // 2) The key is not cached, and we request it.
1147
+ //
1148
+ // If a key is cached but the coordinator no longer exists for us, we
1149
+ // re-request to refresh the coordinator by setting toRequest[key] to
1150
+ // true (bypass cache).
1151
+ //
1152
+ // If we ever request a key ourselves, we do not request it again. We
1153
+ // ensure this by deleting from toRequest. We also delete if the key
1154
+ // was cached with no error.
1155
+ //
1156
+ // We could have some keys cached and some that need to be requested.
1157
+ // We issue a request but do not request what is cached.
1158
+ //
1159
+ // Lastly, we only ever trigger one metadata update, which happens if
1160
+ // we have an unknown coordinator after we load coordinators.
1161
+ var hasLoadedBrokers bool
1162
+ for len (toRequest ) > 0 {
1163
+ var loadWait chan struct {}
1164
+ load2key := make (map [* coordinatorLoad ][]string )
1164
1165
1165
- // loadController returns the group/txn coordinator for the given key, retrying
1166
- // as necessary. Any non-retriable error does not cache the coordinator.
1167
- func (cl * Client ) loadCoordinator (ctx context.Context , key coordinatorKey ) (* broker , error ) {
1168
- var restarted bool
1169
- start:
1170
- cl .coordinatorsMu .Lock ()
1171
- c , ok := cl .coordinators [key ]
1166
+ cl .coordinatorsMu .Lock ()
1167
+ for key , bypassCache := range toRequest {
1168
+ c , ok := cl .coordinators [coordinatorKey {key , typ }]
1169
+ if ! ok || bypassCache {
1170
+ if loadWait == nil {
1171
+ loadWait = make (chan struct {})
1172
+ }
1173
+ c = & coordinatorLoad {
1174
+ loadWait : loadWait ,
1175
+ err : errors .New ("coordinator was not returned in broker response" ),
1176
+ }
1177
+ cl .coordinators [coordinatorKey {key , typ }] = c
1178
+ }
1179
+ load2key [c ] = append (load2key [c ], key )
1180
+ }
1181
+ cl .coordinatorsMu .Unlock ()
1172
1182
1173
- if ! ok {
1174
- c = & coordinatorLoad {
1175
- done : make ( chan struct {}), // all requests for the same coordinator get collapsed into one
1183
+ if loadWait == nil { // all coordinators were cached
1184
+ hasLoadedBrokers = cl . waitCoordinatorLoad ( ctx , typ , load2key , ! hasLoadedBrokers , toRequest , m )
1185
+ continue
1176
1186
}
1177
- defer func () {
1178
- // If our load fails, we avoid caching the coordinator,
1179
- // but only if something else has not already replaced
1180
- // our pointer.
1181
- if c .err != nil {
1182
- cl .deleteStaleCoordinatorIfEqual (key , c )
1187
+
1188
+ key2load := make (map [string ]* coordinatorLoad )
1189
+ req := kmsg .NewPtrFindCoordinatorRequest ()
1190
+ req .CoordinatorType = typ
1191
+ for c , keys := range load2key {
1192
+ if c .loadWait == loadWait { // if this is our wait, this is ours to request
1193
+ req .CoordinatorKeys = append (req .CoordinatorKeys , keys ... )
1194
+ for _ , key := range keys {
1195
+ key2load [key ] = c
1196
+ delete (toRequest , key )
1197
+ }
1183
1198
}
1184
- close (c .done )
1185
- }()
1186
- cl .coordinators [key ] = c
1187
- }
1188
- cl .coordinatorsMu .Unlock ()
1199
+ }
1189
1200
1190
- if ok {
1191
- <- c .done
1192
- if c .err != nil {
1193
- return nil , c .err
1201
+ shards := cl .RequestSharded (ctx , req )
1202
+
1203
+ for _ , shard := range shards {
1204
+ if shard .Err != nil {
1205
+ req := shard .Req .(* kmsg.FindCoordinatorRequest )
1206
+ for _ , key := range req .CoordinatorKeys {
1207
+ c , ok := key2load [key ]
1208
+ if ok {
1209
+ c .err = shard .Err
1210
+ }
1211
+ }
1212
+ } else {
1213
+ resp , _ := shard .Resp .(* kmsg.FindCoordinatorResponse )
1214
+ for _ , rc := range resp .Coordinators {
1215
+ c , ok := key2load [rc .Key ]
1216
+ if ok {
1217
+ c .err = kerr .ErrorForCode (rc .ErrorCode )
1218
+ c .node = resp .NodeID
1219
+ }
1220
+ }
1221
+ }
1194
1222
}
1195
1223
1196
- // If brokerOrErr returns an error, then our cached coordinator
1197
- // is using metadata that has updated and removed knowledge of
1198
- // that coordinator. We delete the stale coordinator here and
1199
- // retry once. The retry will force a coordinator reload, and
1200
- // everything will be fresh. Any errors after that we keep.
1201
- b , err := cl .brokerOrErr (nil , c .node , & errUnknownCoordinator {c .node , key })
1202
- if err != nil && ! restarted {
1203
- restarted = true
1204
- cl .deleteStaleCoordinatorIfEqual (key , c )
1205
- goto start
1224
+ // For anything we loaded, if it has a load failure (including
1225
+ // not being replied to), we remove the key from the cache. We
1226
+ // do not want to cache erroring values.
1227
+ //
1228
+ // We range key2load, which contains only coordinators we are
1229
+ // responsible for loading.
1230
+ cl .coordinatorsMu .Lock ()
1231
+ for key , c := range key2load {
1232
+ if c .err != nil {
1233
+ ck := coordinatorKey {key , typ }
1234
+ if loading , ok := cl .coordinators [ck ]; ok && loading == c {
1235
+ delete (cl .coordinators , ck )
1236
+ }
1237
+ }
1206
1238
}
1207
- return b , err
1208
- }
1239
+ cl .coordinatorsMu .Unlock ()
1209
1240
1210
- var resp * kmsg.FindCoordinatorResponse
1211
- req := kmsg .NewPtrFindCoordinatorRequest ()
1212
- req .CoordinatorKey = key .name
1213
- req .CoordinatorType = key .typ
1214
- _ , resp , c .err = cl .findCoordinator (ctx , req )
1215
- if c .err != nil {
1216
- return nil , c .err
1217
- }
1218
- if c .err = kerr .ErrorForCode (resp .ErrorCode ); c .err != nil {
1219
- return nil , c .err
1241
+ close (loadWait )
1242
+ hasLoadedBrokers = cl .waitCoordinatorLoad (ctx , typ , load2key , ! hasLoadedBrokers , toRequest , m )
1220
1243
}
1221
- c .node = resp .NodeID
1244
+ return m
1245
+ }
1222
1246
1223
- var b * broker
1224
- b , c .err = cl .brokerOrErr (ctx , c .node , & errUnknownCoordinator {c .node , key })
1225
- return b , c .err
1247
+ // After some prep work, we wait for coordinators to load. We update toRequest
1248
+ // values with true if the caller should bypass cache and re-load these
1249
+ // coordinators.
1250
+ //
1251
+ // This returns if we load brokers, and populates m with results.
1252
+ func (cl * Client ) waitCoordinatorLoad (ctx context.Context , typ int8 , load2key map [* coordinatorLoad ][]string , shouldLoadBrokers bool , toRequest map [string ]bool , m map [string ]brokerOrErr ) bool {
1253
+ var loadedBrokers bool
1254
+ for c , keys := range load2key {
1255
+ <- c .loadWait
1256
+ for _ , key := range keys {
1257
+ if c .err != nil {
1258
+ delete (toRequest , key )
1259
+ m [key ] = brokerOrErr {nil , c .err }
1260
+ continue
1261
+ }
1262
+
1263
+ var brokerCtx context.Context
1264
+ if shouldLoadBrokers && ! loadedBrokers {
1265
+ brokerCtx = ctx
1266
+ loadedBrokers = true
1267
+ }
1268
+
1269
+ b , err := cl .brokerOrErr (brokerCtx , c .node , & errUnknownCoordinator {c .node , coordinatorKey {key , typ }})
1270
+ if err != nil {
1271
+ if _ , exists := toRequest [key ]; exists {
1272
+ toRequest [key ] = true
1273
+ continue
1274
+ }
1275
+ // If the key does not exist, we just loaded this
1276
+ // coordinator and also the brokers. We do not
1277
+ // re-request.
1278
+ }
1279
+ delete (toRequest , key )
1280
+ m [key ] = brokerOrErr {b , err }
1281
+ }
1282
+ }
1283
+ return loadedBrokers
1226
1284
}
1227
1285
1228
1286
func (cl * Client ) maybeDeleteStaleCoordinator (name string , typ int8 , err error ) bool {
@@ -1247,37 +1305,6 @@ type brokerOrErr struct {
1247
1305
err error
1248
1306
}
1249
1307
1250
- // loadCoordinators does a concurrent load of many coordinators.
1251
- func (cl * Client ) loadCoordinators (typ int8 , names ... string ) map [string ]brokerOrErr {
1252
- uniq := make (map [string ]struct {})
1253
- for _ , name := range names {
1254
- uniq [name ] = struct {}{}
1255
- }
1256
-
1257
- var mu sync.Mutex
1258
- m := make (map [string ]brokerOrErr )
1259
-
1260
- var wg sync.WaitGroup
1261
- for uniqName := range uniq {
1262
- myName := uniqName
1263
- wg .Add (1 )
1264
- go func () {
1265
- defer wg .Done ()
1266
- coordinator , err := cl .loadCoordinator (cl .ctx , coordinatorKey {
1267
- name : myName ,
1268
- typ : typ ,
1269
- })
1270
-
1271
- mu .Lock ()
1272
- defer mu .Unlock ()
1273
- m [myName ] = brokerOrErr {coordinator , err }
1274
- }()
1275
- }
1276
- wg .Wait ()
1277
-
1278
- return m
1279
- }
1280
-
1281
1308
func (cl * Client ) handleAdminReq (ctx context.Context , req kmsg.Request ) ResponseShard {
1282
1309
// Loading a controller can perform some wait; we accept that and do
1283
1310
// not account for the retries or the time to load the controller as
@@ -1411,10 +1438,7 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request) Re
1411
1438
// coordinator is deleted.
1412
1439
func (cl * Client ) handleCoordinatorReqSimple (ctx context.Context , typ int8 , name string , req kmsg.Request ) ResponseShard {
1413
1440
coordinator , resp , err := cl .handleReqWithCoordinator (ctx , func () (* broker , error ) {
1414
- return cl .loadCoordinator (ctx , coordinatorKey {
1415
- name : name ,
1416
- typ : typ ,
1417
- })
1441
+ return cl .loadCoordinator (ctx , typ , name )
1418
1442
}, typ , name , req )
1419
1443
return shard (coordinator , req , resp , err )
1420
1444
}
@@ -1597,6 +1621,14 @@ type issueShard struct {
1597
1621
1598
1622
// sharder splits a request.
1599
1623
type sharder interface {
1624
+ // If a request originally was not batched, then the protocol switched
1625
+ // to being batched, we always try batched first then fallback.
1626
+ //
1627
+ // Requests that make this switch should always return pinReq requests,
1628
+ // and we must unpack the pinReq to return to end users / use
1629
+ // internally.
1630
+ unpackPinReq () bool
1631
+
1600
1632
// shard splits a request and returns the requests to issue tied to the
1601
1633
// brokers to issue the requests to. This can return an error if there
1602
1634
// is some pre-loading that needs to happen. If an error is returned,
@@ -1605,7 +1637,10 @@ type sharder interface {
1605
1637
// Due to sharded requests not being retriable if a response is
1606
1638
// received, to avoid stale coordinator errors, this function should
1607
1639
// not use any previously cached metadata.
1608
- shard (context.Context , kmsg.Request ) ([]issueShard , bool , error )
1640
+ //
1641
+ // This takes the last error if the request is being retried, which is
1642
+ // currently only useful for errBrokerTooOld.
1643
+ shard (context.Context , kmsg.Request , error ) ([]issueShard , bool , error )
1609
1644
1610
1645
// onResp is called on a successful response to investigate the
1611
1646
// response and potentially perform cleanup, and potentially returns an
@@ -1628,6 +1663,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
1628
1663
sharder = & listOffsetsSharder {cl }
1629
1664
case * kmsg.OffsetFetchRequest :
1630
1665
sharder = & offsetFetchSharder {cl }
1666
+ case * kmsg.FindCoordinatorRequest :
1667
+ sharder = & findCoordinatorSharder {cl }
1631
1668
case * kmsg.DescribeGroupsRequest :
1632
1669
sharder = & describeGroupsSharder {cl }
1633
1670
case * kmsg.ListGroupsRequest :
@@ -1660,8 +1697,9 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
1660
1697
// again). reqTry tracks how many total tries a request piece has had;
1661
1698
// we quit at either the max configured tries or max configured time.
1662
1699
type reqTry struct {
1663
- tries int
1664
- req kmsg.Request
1700
+ tries int
1701
+ req kmsg.Request
1702
+ lastErr error
1665
1703
}
1666
1704
1667
1705
var (
@@ -1687,8 +1725,9 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
1687
1725
// issue is called to progressively split and issue requests.
1688
1726
//
1689
1727
// This recursively calls itself if a request fails and can be retried.
1728
+ // We avoid stack problems because this calls itself in a goroutine.
1690
1729
issue = func (try reqTry ) {
1691
- issues , reshardable , err := sharder .shard (ctx , try .req )
1730
+ issues , reshardable , err := sharder .shard (ctx , try .req , try . lastErr )
1692
1731
if err != nil {
1693
1732
l .Log (LogLevelDebug , "unable to shard request" , "previous_tries" , try .tries , "err" , err )
1694
1733
addShard (shard (nil , try .req , nil , err )) // failure to shard means data loading failed; this request is failed
@@ -1722,9 +1761,13 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
1722
1761
1723
1762
for i := range issues {
1724
1763
myIssue := issues [i ]
1764
+ myUnderlyingReq := myIssue .req
1765
+ if sharder .unpackPinReq () {
1766
+ myUnderlyingReq = myIssue .req .(* pinReq ).Request
1767
+ }
1725
1768
1726
1769
if myIssue .err != nil {
1727
- addShard (shard (nil , myIssue . req , nil , myIssue .err ))
1770
+ addShard (shard (nil , myUnderlyingReq , nil , myIssue .err ))
1728
1771
continue
1729
1772
}
1730
1773
@@ -1741,20 +1784,22 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
1741
1784
broker , err = cl .brokerOrErr (ctx , myIssue .broker , errUnknownBroker )
1742
1785
}
1743
1786
if err != nil {
1744
- addShard (shard (nil , myIssue . req , nil , err )) // failure to load a broker is a failure to issue a request
1787
+ addShard (shard (nil , myUnderlyingReq , nil , err )) // failure to load a broker is a failure to issue a request
1745
1788
return
1746
1789
}
1747
1790
1748
1791
resp , err := broker .waitResp (ctx , myIssue .req )
1749
1792
if err == nil {
1750
- err = sharder .onResp (myIssue . req , resp ) // perform some potential cleanup, and potentially receive an error to retry
1793
+ err = sharder .onResp (myUnderlyingReq , resp ) // perform some potential cleanup, and potentially receive an error to retry
1751
1794
}
1752
1795
1753
1796
// If we failed to issue the request, we *maybe* will retry.
1754
1797
// We could have failed to even issue the request or receive
1755
1798
// a response, which is retriable.
1756
1799
backoff := cl .cfg .retryBackoff (tries )
1757
- if err != nil && (retryTimeout == 0 || time .Now ().Add (backoff ).Sub (start ) < retryTimeout ) && cl .shouldRetry (tries , err ) && cl .waitTries (ctx , backoff ) {
1800
+ if err != nil &&
1801
+ (retryTimeout == 0 || time .Now ().Add (backoff ).Sub (start ) < retryTimeout ) &&
1802
+ (reshardable && sharder .unpackPinReq () && errors .Is (err , errBrokerTooOld ) || cl .shouldRetry (tries , err ) && cl .waitTries (ctx , backoff )) {
1758
1803
// Non-reshardable re-requests just jump back to the
1759
1804
// top where the broker is loaded. This is the case on
1760
1805
// requests where the original request is split to
@@ -1764,16 +1809,16 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
1764
1809
goto start
1765
1810
}
1766
1811
l .Log (LogLevelDebug , "sharded request failed, resharding and reissuing" , "time_since_start" , time .Since (start ), "tries" , try .tries , "err" , err )
1767
- issue (reqTry {tries , myIssue . req })
1812
+ issue (reqTry {tries , myUnderlyingReq , err })
1768
1813
return
1769
1814
}
1770
1815
1771
- addShard (shard (broker , myIssue . req , resp , err )) // the error was not retriable
1816
+ addShard (shard (broker , myUnderlyingReq , resp , err )) // the error was not retriable
1772
1817
}()
1773
1818
}
1774
1819
}
1775
1820
1776
- issue (reqTry {0 , req })
1821
+ issue (reqTry {0 , req , nil })
1777
1822
wg .Wait ()
1778
1823
1779
1824
return shards , sharder .merge
@@ -2031,7 +2076,9 @@ func (l *unknownErrShards) collect(mkreq, mergeParts interface{}) []issueShard {
2031
2076
// handles sharding ListOffsetsRequest
2032
2077
type listOffsetsSharder struct { * Client }
2033
2078
2034
- func (cl * listOffsetsSharder ) shard (ctx context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
2079
+ func (* listOffsetsSharder ) unpackPinReq () bool { return false }
2080
+
2081
+ func (cl * listOffsetsSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
2035
2082
req := kreq .(* kmsg.ListOffsetsRequest )
2036
2083
2037
2084
// For listing offsets, we need the broker leader for each partition we
@@ -2157,6 +2204,8 @@ func (*listOffsetsSharder) merge(sresps []ResponseShard) (kmsg.Response, error)
2157
2204
// handles sharding OffsetFetchRequest
2158
2205
type offsetFetchSharder struct { * Client }
2159
2206
2207
+ func (* offsetFetchSharder ) unpackPinReq () bool { return true } // batch first, single group fallback
2208
+
2160
2209
func offsetFetchReqToGroup (req * kmsg.OffsetFetchRequest ) kmsg.OffsetFetchRequestGroup {
2161
2210
g := kmsg .NewOffsetFetchRequestGroup ()
2162
2211
g .Group = req .Group
@@ -2169,6 +2218,19 @@ func offsetFetchReqToGroup(req *kmsg.OffsetFetchRequest) kmsg.OffsetFetchRequest
2169
2218
return g
2170
2219
}
2171
2220
2221
+ func offsetFetchGroupToReq (requireStable bool , group kmsg.OffsetFetchRequestGroup ) * kmsg.OffsetFetchRequest {
2222
+ req := kmsg .NewPtrOffsetFetchRequest ()
2223
+ req .RequireStable = requireStable
2224
+ req .Group = group .Group
2225
+ for _ , topic := range group .Topics {
2226
+ reqTopic := kmsg .NewOffsetFetchRequestTopic ()
2227
+ reqTopic .Topic = topic .Topic
2228
+ reqTopic .Partitions = topic .Partitions
2229
+ req .Topics = append (req .Topics , reqTopic )
2230
+ }
2231
+ return req
2232
+ }
2233
+
2172
2234
func offsetFetchRespToGroup (req * kmsg.OffsetFetchRequest , resp * kmsg.OffsetFetchResponse ) kmsg.OffsetFetchResponseGroup {
2173
2235
g := kmsg .NewOffsetFetchResponseGroup ()
2174
2236
g .Group = req .Group
@@ -2209,46 +2271,26 @@ func offsetFetchRespGroupIntoResp(g kmsg.OffsetFetchResponseGroup, into *kmsg.Of
2209
2271
}
2210
2272
}
2211
2273
2212
- func (cl * offsetFetchSharder ) shard (_ context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
2274
+ func (cl * offsetFetchSharder ) shard (ctx context.Context , kreq kmsg.Request , lastErr error ) ([]issueShard , bool , error ) {
2213
2275
req := kreq .(* kmsg.OffsetFetchRequest )
2214
2276
2215
- groups := make ([]string , 0 , len (req .Groups )+ 1 )
2216
- if len (req .Groups ) == 0 { // v0-v7
2217
- groups = append (groups , req .Group )
2218
- }
2219
- for i := range req .Groups { // v8+
2220
- groups = append (groups , req .Groups [i ].Group )
2221
- }
2222
-
2223
- coordinators := cl .loadCoordinators (coordinatorTypeGroup , groups ... )
2277
+ // We always try batching and only split at the end if lastErr
2278
+ // indicates too old. We convert to batching immediately.
2279
+ dup := * req
2280
+ req = & dup
2224
2281
2225
- // If there is only the top level group, then we simply return our
2226
- // request mapped to its specific broker. For forward compatibility, we
2227
- // also embed the top level request into the Groups list: this allows
2228
- // operators of old request versions (v0-v7) to issue a v8 request
2229
- // appropriately. On response, if the length of groups is 1, we merge
2230
- // the first item back to the top level.
2231
2282
if len (req .Groups ) == 0 {
2232
- berr := coordinators [ req .Group ]
2233
- if berr . err != nil {
2234
- return [] issueShard {{ //nolint:nilerr // error is returned in the struct
2235
- req : req ,
2236
- err : berr . err ,
2237
- }}, false , nil // not reshardable, because this is an error
2283
+ req . Groups = append ( req .Groups , offsetFetchReqToGroup ( req ))
2284
+ }
2285
+ groups := make ([] string , 0 , len ( req . Groups ))
2286
+ for i := range req . Groups {
2287
+ if g := req . Groups [ i ]. Group ; len ( g ) > 0 {
2288
+ groups = append ( groups , req . Groups [ i ]. Group )
2238
2289
}
2239
-
2240
- dup := * req
2241
- brokerReq := & dup
2242
- brokerReq .Groups = append (brokerReq .Groups , offsetFetchReqToGroup (req ))
2243
-
2244
- return []issueShard {{
2245
- req : brokerReq ,
2246
- broker : berr .b .meta .NodeID ,
2247
- }}, false , nil // reshardable to reload correct coordinator
2248
2290
}
2249
2291
2250
- // v8+ behavior: we have multiple groups.
2251
- //
2292
+ coordinators := cl . loadCoordinators ( ctx , coordinatorTypeGroup , groups ... )
2293
+
2252
2294
// Loading coordinators can have each group fail with its unique error,
2253
2295
// or with a kerr.Error that can be merged. Unique errors get their own
2254
2296
// failure shard, while kerr.Error's get merged.
@@ -2287,12 +2329,24 @@ func (cl *offsetFetchSharder) shard(_ context.Context, kreq kmsg.Request) ([]iss
2287
2329
}
2288
2330
}
2289
2331
2332
+ splitReq := errors .Is (lastErr , errBrokerTooOld )
2333
+
2290
2334
var issues []issueShard
2291
2335
for id , req := range brokerReqs {
2292
- issues = append (issues , issueShard {
2293
- req : req ,
2294
- broker : id ,
2295
- })
2336
+ if splitReq {
2337
+ for _ , group := range req .Groups {
2338
+ req := offsetFetchGroupToReq (req .RequireStable , group )
2339
+ issues = append (issues , issueShard {
2340
+ req : & pinReq {Request : req , pinMax : true , max : 7 },
2341
+ broker : id ,
2342
+ })
2343
+ }
2344
+ } else {
2345
+ issues = append (issues , issueShard {
2346
+ req : & pinReq {Request : req , pinMin : true , min : 8 },
2347
+ broker : id ,
2348
+ })
2349
+ }
2296
2350
}
2297
2351
for _ , unkerr := range unkerrs {
2298
2352
issues = append (issues , issueShard {
@@ -2311,7 +2365,7 @@ func (cl *offsetFetchSharder) shard(_ context.Context, kreq kmsg.Request) ([]iss
2311
2365
}
2312
2366
2313
2367
func (cl * offsetFetchSharder ) onResp (kreq kmsg.Request , kresp kmsg.Response ) error {
2314
- req := kreq .(* kmsg.OffsetFetchRequest )
2368
+ req := kreq .(* kmsg.OffsetFetchRequest ) // we always issue pinned requests
2315
2369
resp := kresp .(* kmsg.OffsetFetchResponse )
2316
2370
2317
2371
switch len (resp .Groups ) {
@@ -2365,13 +2419,106 @@ func (*offsetFetchSharder) merge(sresps []ResponseShard) (kmsg.Response, error)
2365
2419
})
2366
2420
}
2367
2421
2422
+ // handles sharding FindCoordinatorRequest
2423
+ type findCoordinatorSharder struct { * Client }
2424
+
2425
+ func (* findCoordinatorSharder ) unpackPinReq () bool { return true } // batch first, single key fallback
2426
+
2427
+ func findCoordinatorRespCoordinatorIntoResp (c kmsg.FindCoordinatorResponseCoordinator , into * kmsg.FindCoordinatorResponse ) {
2428
+ into .NodeID = c .NodeID
2429
+ into .Host = c .Host
2430
+ into .Port = c .Port
2431
+ into .ErrorCode = c .ErrorCode
2432
+ into .ErrorMessage = c .ErrorMessage
2433
+ }
2434
+
2435
+ func (* findCoordinatorSharder ) shard (_ context.Context , kreq kmsg.Request , lastErr error ) ([]issueShard , bool , error ) {
2436
+ req := kreq .(* kmsg.FindCoordinatorRequest )
2437
+
2438
+ // We always try batching and only split at the end if lastErr
2439
+ // indicates too old. We convert to batching immediately.
2440
+ dup := * req
2441
+ req = & dup
2442
+
2443
+ uniq := make (map [string ]struct {}, len (req .CoordinatorKeys ))
2444
+ uniq [req .CoordinatorKey ] = struct {}{}
2445
+ for _ , key := range req .CoordinatorKeys {
2446
+ uniq [key ] = struct {}{}
2447
+ }
2448
+ req .CoordinatorKeys = req .CoordinatorKeys [:0 ]
2449
+ for key := range uniq {
2450
+ if len (key ) > 0 {
2451
+ req .CoordinatorKeys = append (req .CoordinatorKeys , key )
2452
+ }
2453
+ }
2454
+
2455
+ splitReq := errors .Is (lastErr , errBrokerTooOld )
2456
+ if ! splitReq {
2457
+ return []issueShard {{
2458
+ req : & pinReq {Request : req , pinMin : true , min : 4 },
2459
+ any : true ,
2460
+ }}, true , nil // this is "reshardable", in that we will split the request next
2461
+ }
2462
+
2463
+ var issues []issueShard
2464
+ for _ , key := range req .CoordinatorKeys {
2465
+ sreq := kmsg .NewPtrFindCoordinatorRequest ()
2466
+ sreq .CoordinatorType = req .CoordinatorType
2467
+ sreq .CoordinatorKey = key
2468
+ issues = append (issues , issueShard {
2469
+ req : & pinReq {Request : sreq , pinMax : true , max : 3 },
2470
+ any : true ,
2471
+ })
2472
+ }
2473
+ return issues , false , nil // not reshardable
2474
+ }
2475
+
2476
+ func (* findCoordinatorSharder ) onResp (kreq kmsg.Request , kresp kmsg.Response ) error {
2477
+ req := kreq .(* kmsg.FindCoordinatorRequest ) // we always issue pinned requests
2478
+ resp := kresp .(* kmsg.FindCoordinatorResponse )
2479
+
2480
+ switch len (resp .Coordinators ) {
2481
+ case 0 :
2482
+ // Convert v3 and prior to v4+
2483
+ rc := kmsg .NewFindCoordinatorResponseCoordinator ()
2484
+ rc .Key = req .CoordinatorKey
2485
+ rc .NodeID = resp .NodeID
2486
+ rc .Host = resp .Host
2487
+ rc .Port = resp .Port
2488
+ rc .ErrorCode = resp .ErrorCode
2489
+ rc .ErrorMessage = resp .ErrorMessage
2490
+ resp .Coordinators = append (resp .Coordinators , rc )
2491
+ case 1 :
2492
+ // Convert v4 to v3 and prior
2493
+ findCoordinatorRespCoordinatorIntoResp (resp .Coordinators [0 ], resp )
2494
+ }
2495
+
2496
+ return nil
2497
+ }
2498
+
2499
+ func (* findCoordinatorSharder ) merge (sresps []ResponseShard ) (kmsg.Response , error ) {
2500
+ merged := kmsg .NewPtrFindCoordinatorResponse ()
2501
+ return merged , firstErrMerger (sresps , func (kresp kmsg.Response ) {
2502
+ resp := kresp .(* kmsg.FindCoordinatorResponse )
2503
+ merged .Version = resp .Version
2504
+ merged .ThrottleMillis = resp .ThrottleMillis
2505
+ merged .Coordinators = append (merged .Coordinators , resp .Coordinators ... )
2506
+
2507
+ if len (resp .Coordinators ) == 1 {
2508
+ findCoordinatorRespCoordinatorIntoResp (resp .Coordinators [0 ], merged )
2509
+ }
2510
+ })
2511
+ }
2512
+
2368
2513
// handles sharding DescribeGroupsRequest
2369
2514
type describeGroupsSharder struct { * Client }
2370
2515
2371
- func (cl * describeGroupsSharder ) shard (_ context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
2516
+ func (* describeGroupsSharder ) unpackPinReq () bool { return false }
2517
+
2518
+ func (cl * describeGroupsSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
2372
2519
req := kreq .(* kmsg.DescribeGroupsRequest )
2373
2520
2374
- coordinators := cl .loadCoordinators (coordinatorTypeGroup , req .Groups ... )
2521
+ coordinators := cl .loadCoordinators (ctx , coordinatorTypeGroup , req .Groups ... )
2375
2522
type unkerr struct {
2376
2523
err error
2377
2524
group string
@@ -2455,7 +2602,9 @@ func (*describeGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, erro
2455
2602
// handles sharding ListGroupsRequest
2456
2603
type listGroupsSharder struct { * Client }
2457
2604
2458
- func (cl * listGroupsSharder ) shard (ctx context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
2605
+ func (* listGroupsSharder ) unpackPinReq () bool { return false }
2606
+
2607
+ func (cl * listGroupsSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
2459
2608
req := kreq .(* kmsg.ListGroupsRequest )
2460
2609
return cl .allBrokersShardedReq (ctx , func () kmsg.Request {
2461
2610
dup := * req
@@ -2484,7 +2633,9 @@ func (*listGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) {
2484
2633
// handle sharding DeleteRecordsRequest
2485
2634
type deleteRecordsSharder struct { * Client }
2486
2635
2487
- func (cl * deleteRecordsSharder ) shard (ctx context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
2636
+ func (* deleteRecordsSharder ) unpackPinReq () bool { return false }
2637
+
2638
+ func (cl * deleteRecordsSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
2488
2639
req := kreq .(* kmsg.DeleteRecordsRequest )
2489
2640
2490
2641
var need []string
@@ -2601,7 +2752,9 @@ func (*deleteRecordsSharder) merge(sresps []ResponseShard) (kmsg.Response, error
2601
2752
// handle sharding OffsetForLeaderEpochRequest
2602
2753
type offsetForLeaderEpochSharder struct { * Client }
2603
2754
2604
- func (cl * offsetForLeaderEpochSharder ) shard (ctx context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
2755
+ func (* offsetForLeaderEpochSharder ) unpackPinReq () bool { return false }
2756
+
2757
+ func (cl * offsetForLeaderEpochSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
2605
2758
req := kreq .(* kmsg.OffsetForLeaderEpochRequest )
2606
2759
2607
2760
var need []string
@@ -2718,7 +2871,9 @@ func (*offsetForLeaderEpochSharder) merge(sresps []ResponseShard) (kmsg.Response
2718
2871
// handle sharding DescribeConfigsRequest
2719
2872
type describeConfigsSharder struct { * Client }
2720
2873
2721
- func (* describeConfigsSharder ) shard (_ context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
2874
+ func (* describeConfigsSharder ) unpackPinReq () bool { return false }
2875
+
2876
+ func (* describeConfigsSharder ) shard (_ context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
2722
2877
req := kreq .(* kmsg.DescribeConfigsRequest )
2723
2878
2724
2879
brokerReqs := make (map [int32 ][]kmsg.DescribeConfigsRequestResource )
@@ -2783,7 +2938,9 @@ func (*describeConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, err
2783
2938
// handle sharding AlterConfigsRequest
2784
2939
type alterConfigsSharder struct { * Client }
2785
2940
2786
- func (* alterConfigsSharder ) shard (_ context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
2941
+ func (* alterConfigsSharder ) unpackPinReq () bool { return false }
2942
+
2943
+ func (* alterConfigsSharder ) shard (_ context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
2787
2944
req := kreq .(* kmsg.AlterConfigsRequest )
2788
2945
2789
2946
brokerReqs := make (map [int32 ][]kmsg.AlterConfigsRequestResource )
@@ -2846,7 +3003,9 @@ func (*alterConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, error)
2846
3003
// handles sharding AlterReplicaLogDirsRequest
2847
3004
type alterReplicaLogDirsSharder struct { * Client }
2848
3005
2849
- func (cl * alterReplicaLogDirsSharder ) shard (ctx context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
3006
+ func (* alterReplicaLogDirsSharder ) unpackPinReq () bool { return false }
3007
+
3008
+ func (cl * alterReplicaLogDirsSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
2850
3009
req := kreq .(* kmsg.AlterReplicaLogDirsRequest )
2851
3010
2852
3011
needMap := make (map [string ]struct {})
@@ -2991,7 +3150,9 @@ func (*alterReplicaLogDirsSharder) merge(sresps []ResponseShard) (kmsg.Response,
2991
3150
// handles sharding DescribeLogDirsRequest
2992
3151
type describeLogDirsSharder struct { * Client }
2993
3152
2994
- func (cl * describeLogDirsSharder ) shard (ctx context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
3153
+ func (* describeLogDirsSharder ) unpackPinReq () bool { return false }
3154
+
3155
+ func (cl * describeLogDirsSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
2995
3156
req := kreq .(* kmsg.DescribeLogDirsRequest )
2996
3157
2997
3158
// If req.Topics is nil, the request is to describe all logdirs. Thus,
@@ -3106,10 +3267,12 @@ func (*describeLogDirsSharder) merge(sresps []ResponseShard) (kmsg.Response, err
3106
3267
// handles sharding DeleteGroupsRequest
3107
3268
type deleteGroupsSharder struct { * Client }
3108
3269
3109
- func (cl * deleteGroupsSharder ) shard (_ context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
3270
+ func (* deleteGroupsSharder ) unpackPinReq () bool { return false }
3271
+
3272
+ func (cl * deleteGroupsSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
3110
3273
req := kreq .(* kmsg.DeleteGroupsRequest )
3111
3274
3112
- coordinators := cl .loadCoordinators (coordinatorTypeGroup , req .Groups ... )
3275
+ coordinators := cl .loadCoordinators (ctx , coordinatorTypeGroup , req .Groups ... )
3113
3276
type unkerr struct {
3114
3277
err error
3115
3278
group string
@@ -3192,7 +3355,9 @@ func (*deleteGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, error)
3192
3355
// handle sharding IncrementalAlterConfigsRequest
3193
3356
type incrementalAlterConfigsSharder struct { * Client }
3194
3357
3195
- func (* incrementalAlterConfigsSharder ) shard (_ context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
3358
+ func (* incrementalAlterConfigsSharder ) unpackPinReq () bool { return false }
3359
+
3360
+ func (* incrementalAlterConfigsSharder ) shard (_ context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
3196
3361
req := kreq .(* kmsg.IncrementalAlterConfigsRequest )
3197
3362
3198
3363
brokerReqs := make (map [int32 ][]kmsg.IncrementalAlterConfigsRequestResource )
@@ -3255,7 +3420,9 @@ func (*incrementalAlterConfigsSharder) merge(sresps []ResponseShard) (kmsg.Respo
3255
3420
// handle sharding DescribeProducersRequest
3256
3421
type describeProducersSharder struct { * Client }
3257
3422
3258
- func (cl * describeProducersSharder ) shard (ctx context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
3423
+ func (* describeProducersSharder ) unpackPinReq () bool { return false }
3424
+
3425
+ func (cl * describeProducersSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
3259
3426
req := kreq .(* kmsg.DescribeProducersRequest )
3260
3427
3261
3428
var need []string
@@ -3363,10 +3530,12 @@ func (*describeProducersSharder) merge(sresps []ResponseShard) (kmsg.Response, e
3363
3530
// handles sharding DescribeTransactionsRequest
3364
3531
type describeTransactionsSharder struct { * Client }
3365
3532
3366
- func (cl * describeTransactionsSharder ) shard (_ context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
3533
+ func (* describeTransactionsSharder ) unpackPinReq () bool { return false }
3534
+
3535
+ func (cl * describeTransactionsSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
3367
3536
req := kreq .(* kmsg.DescribeTransactionsRequest )
3368
3537
3369
- coordinators := cl .loadCoordinators (coordinatorTypeTxn , req .TransactionalIDs ... )
3538
+ coordinators := cl .loadCoordinators (ctx , coordinatorTypeTxn , req .TransactionalIDs ... )
3370
3539
type unkerr struct {
3371
3540
err error
3372
3541
txnID string
@@ -3449,7 +3618,9 @@ func (*describeTransactionsSharder) merge(sresps []ResponseShard) (kmsg.Response
3449
3618
// handles sharding ListTransactionsRequest
3450
3619
type listTransactionsSharder struct { * Client }
3451
3620
3452
- func (cl * listTransactionsSharder ) shard (ctx context.Context , kreq kmsg.Request ) ([]issueShard , bool , error ) {
3621
+ func (* listTransactionsSharder ) unpackPinReq () bool { return false }
3622
+
3623
+ func (cl * listTransactionsSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
3453
3624
req := kreq .(* kmsg.ListTransactionsRequest )
3454
3625
return cl .allBrokersShardedReq (ctx , func () kmsg.Request {
3455
3626
dup := * req
0 commit comments