Skip to content

Commit 9ac6c97

Browse files
committedNov 8, 2022
kgo: support forward & backward batch requests for FindCoordinator, OffsetFetch
Previously, the client was only forward compatible. Opting into batching was at your own risk. We now default to batching and split to the old single behavior as needed. To support this, we have a new internal request version pinner. We always pin to batched version, and if we get errBrokerTooOld, we split. loadCoordinators is a good bit more complex now, and we basically delete loadCoordinator. This is a bit complicated to describe in a single commit message.
1 parent ba55f7d commit 9ac6c97

File tree

2 files changed

+406
-202
lines changed

2 files changed

+406
-202
lines changed
 

‎pkg/kgo/broker.go

+35-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,24 @@ import (
2222
"github.com/twmb/franz-go/pkg/sasl"
2323
)
2424

25+
type pinReq struct {
26+
kmsg.Request
27+
min int16
28+
max int16
29+
pinMin bool
30+
pinMax bool
31+
}
32+
33+
func (p *pinReq) SetVersion(v int16) {
34+
if p.pinMin && v < p.min {
35+
v = p.min
36+
}
37+
if p.pinMax && v > p.max {
38+
v = p.max
39+
}
40+
p.Request.SetVersion(v)
41+
}
42+
2543
type promisedReq struct {
2644
ctx context.Context
2745
req kmsg.Request
@@ -303,18 +321,33 @@ func (b *broker) handleReq(pr promisedReq) {
303321
version = brokerMax
304322
}
305323

324+
minVersion := int16(-1)
325+
306326
// If the version now (after potential broker downgrading) is
307327
// lower than we desire, we fail the request for the broker is
308328
// too old.
309329
if b.cl.cfg.minVersions != nil {
310-
minVersion, minVersionExists := b.cl.cfg.minVersions.LookupMaxKeyVersion(req.Key())
311-
if minVersionExists && version < minVersion {
330+
minVersion, _ = b.cl.cfg.minVersions.LookupMaxKeyVersion(req.Key())
331+
if minVersion > -1 && version < minVersion {
312332
pr.promise(nil, errBrokerTooOld)
313333
return
314334
}
315335
}
316336

317337
req.SetVersion(version) // always go for highest version
338+
setVersion := req.GetVersion()
339+
if minVersion > -1 && setVersion < minVersion {
340+
pr.promise(nil, fmt.Errorf("request key %d version returned %d below the user defined min of %d", req.Key(), setVersion, minVersion))
341+
return
342+
}
343+
if version < setVersion {
344+
// If we want to set an old version, but the request is pinned
345+
// high, we need to fail with errBrokerTooOld. The broker wants
346+
// an old version, we want a high version. We rely on this
347+
// error in backcompat request sharding.
348+
pr.promise(nil, errBrokerTooOld)
349+
return
350+
}
318351

319352
for reauthentications := 1; !cxn.expiry.IsZero() && time.Now().After(cxn.expiry); reauthentications++ {
320353
// We allow 15 reauths, which is a lot. If a new lifetime is

‎pkg/kgo/client.go

+371-200
Original file line numberDiff line numberDiff line change
@@ -478,8 +478,7 @@ func (cl *Client) waitTries(ctx context.Context, backoff time.Duration) bool {
478478
// case, such as when a person explicitly assigns offsets with epochs, but we
479479
// catch a few areas that would be returned from a broker itself.
480480
//
481-
// This function is always used *after* at least one request has been issued,
482-
// so we do not check ensurePinged.
481+
// This function is always used *after* at least one request has been issued.
483482
//
484483
// NOTE: This is a weak check; we check if any broker in the cluster supports
485484
// the request. We use this function in three locations:
@@ -503,7 +502,6 @@ func (cl *Client) supportsOffsetForLeaderEpoch() bool {
503502

504503
// A broker may not support some requests we want to make. This function checks
505504
// support. This should only be used *after* at least one successful response.
506-
// To absolutely ensure a response has been received, use ensurePinged.
507505
func (cl *Client) supportsKeyVersion(key, version int16) bool {
508506
cl.brokersMu.RLock()
509507
defer cl.brokersMu.RUnlock()
@@ -742,6 +740,7 @@ func (cl *Client) Close() {
742740
//
743741
// ListOffsets
744742
// OffsetFetch (if using v8+ for Kafka 3.0+)
743+
// FindCoordinator (if using v4+ for Kafka 3.0+)
745744
// DescribeGroups
746745
// ListGroups
747746
// DeleteRecords
@@ -757,10 +756,9 @@ func (cl *Client) Close() {
757756
// ListTransactions
758757
//
759758
// Kafka 3.0 introduced batch OffsetFetch and batch FindCoordinator requests.
760-
// This function is forward-compatible for the old, singular OffsetFetch and
761-
// FindCoordinator requests, but is not backward-compatible for batched
762-
// requests. It is recommended to only use the old format unless you know you
763-
// are speaking to Kafka 3.0+.
759+
// This function is forward and backward compatible: old requests will be
760+
// batched as necessary, and batched requests will be split as necessary. It is
761+
// recommended to always use batch requests for simplicity.
764762
//
765763
// In short, this method tries to do the correct thing depending on what type
766764
// of request is being issued.
@@ -963,6 +961,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
963961
switch t := req.(type) {
964962
case *kmsg.ListOffsetsRequest, // key 2
965963
*kmsg.OffsetFetchRequest, // key 9
964+
*kmsg.FindCoordinatorRequest, // key 10
966965
*kmsg.DescribeGroupsRequest, // key 15
967966
*kmsg.ListGroupsRequest, // key 16
968967
*kmsg.DeleteRecordsRequest, // key 21
@@ -978,24 +977,19 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
978977
*kmsg.ListTransactionsRequest: // key 66
979978
return cl.handleShardedReq(ctx, req)
980979

981-
// We support being forward-compatible with FindCoordinator, so we need
982-
// to use our special hijack function that batches a singular key.
983-
case *kmsg.FindCoordinatorRequest:
984-
last, resp, err := cl.findCoordinator(ctx, t)
985-
return shards(shard(last, req, resp, err)), nil
986-
}
987-
988-
switch t := req.(type) {
989980
case *kmsg.MetadataRequest:
990981
// We hijack any metadata request so as to populate our
991982
// own brokers and controller ID.
992983
br, resp, err := cl.fetchMetadata(ctx, t, false)
993984
return shards(shard(br, req, resp, err)), nil
985+
994986
case kmsg.AdminRequest:
995987
return shards(cl.handleAdminReq(ctx, t)), nil
988+
996989
case kmsg.GroupCoordinatorRequest,
997990
kmsg.TxnCoordinatorRequest:
998991
return shards(cl.handleCoordinatorReq(ctx, t)), nil
992+
999993
case *kmsg.ApiVersionsRequest:
1000994
// As of v3, software name and version are required.
1001995
// If they are missing, we use the config options.
@@ -1121,108 +1115,172 @@ type coordinatorKey struct {
11211115
}
11221116

11231117
type coordinatorLoad struct {
1124-
done chan struct{}
1125-
node int32
1126-
err error
1118+
loadWait chan struct{}
1119+
node int32
1120+
err error
1121+
}
1122+
1123+
func (cl *Client) loadCoordinator(ctx context.Context, typ int8, key string) (*broker, error) {
1124+
berr := cl.loadCoordinators(ctx, typ, key)[key]
1125+
return berr.b, berr.err
11271126
}
11281127

1129-
// findCoordinator is allows FindCoordinator request to be forward compatible,
1130-
// by duplicating a top level request into a single-element batch request, and
1131-
// downconverting the response.
1132-
func (cl *Client) findCoordinator(ctx context.Context, req *kmsg.FindCoordinatorRequest) (*broker, *kmsg.FindCoordinatorResponse, error) {
1133-
var compat bool
1134-
if len(req.CoordinatorKeys) == 0 {
1135-
req.CoordinatorKeys = []string{req.CoordinatorKey}
1136-
compat = true
1128+
func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string) map[string]brokerOrErr {
1129+
m := make(map[string]brokerOrErr, len(keys))
1130+
if len(keys) == 0 {
1131+
return m
11371132
}
1138-
r := cl.retriable()
1139-
resp, err := req.RequestWith(ctx, r)
1140-
if resp != nil {
1141-
if compat && resp.Version >= 4 {
1142-
if l := len(resp.Coordinators); l != 1 {
1143-
return r.last, resp, fmt.Errorf("unexpectedly received %d coordinators when requesting 1", l)
1144-
}
11451133

1146-
first := resp.Coordinators[0]
1147-
resp.ErrorCode = first.ErrorCode
1148-
resp.ErrorMessage = first.ErrorMessage
1149-
resp.NodeID = first.NodeID
1150-
resp.Host = first.Host
1151-
resp.Port = first.Port
1134+
toRequest := make(map[string]bool, len(keys)) // true == bypass the cache
1135+
for _, key := range keys {
1136+
if len(key) > 0 {
1137+
toRequest[key] = false
11521138
}
11531139
}
1154-
return r.last, resp, err
1155-
}
11561140

1157-
func (cl *Client) deleteStaleCoordinatorIfEqual(key coordinatorKey, current *coordinatorLoad) {
1158-
cl.coordinatorsMu.Lock()
1159-
defer cl.coordinatorsMu.Unlock()
1160-
if existing, ok := cl.coordinators[key]; ok && current == existing {
1161-
delete(cl.coordinators, key)
1162-
}
1163-
}
1141+
// For each of these keys, we have two cases:
1142+
//
1143+
// 1) The key is cached. It is either loading or loaded. We do not
1144+
// request the key ourselves; we wait for the load to finish.
1145+
//
1146+
// 2) The key is not cached, and we request it.
1147+
//
1148+
// If a key is cached but the coordinator no longer exists for us, we
1149+
// re-request to refresh the coordinator by setting toRequest[key] to
1150+
// true (bypass cache).
1151+
//
1152+
// If we ever request a key ourselves, we do not request it again. We
1153+
// ensure this by deleting from toRequest. We also delete if the key
1154+
// was cached with no error.
1155+
//
1156+
// We could have some keys cached and some that need to be requested.
1157+
// We issue a request but do not request what is cached.
1158+
//
1159+
// Lastly, we only ever trigger one metadata update, which happens if
1160+
// we have an unknown coordinator after we load coordinators.
1161+
var hasLoadedBrokers bool
1162+
for len(toRequest) > 0 {
1163+
var loadWait chan struct{}
1164+
load2key := make(map[*coordinatorLoad][]string)
11641165

1165-
// loadController returns the group/txn coordinator for the given key, retrying
1166-
// as necessary. Any non-retriable error does not cache the coordinator.
1167-
func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*broker, error) {
1168-
var restarted bool
1169-
start:
1170-
cl.coordinatorsMu.Lock()
1171-
c, ok := cl.coordinators[key]
1166+
cl.coordinatorsMu.Lock()
1167+
for key, bypassCache := range toRequest {
1168+
c, ok := cl.coordinators[coordinatorKey{key, typ}]
1169+
if !ok || bypassCache {
1170+
if loadWait == nil {
1171+
loadWait = make(chan struct{})
1172+
}
1173+
c = &coordinatorLoad{
1174+
loadWait: loadWait,
1175+
err: errors.New("coordinator was not returned in broker response"),
1176+
}
1177+
cl.coordinators[coordinatorKey{key, typ}] = c
1178+
}
1179+
load2key[c] = append(load2key[c], key)
1180+
}
1181+
cl.coordinatorsMu.Unlock()
11721182

1173-
if !ok {
1174-
c = &coordinatorLoad{
1175-
done: make(chan struct{}), // all requests for the same coordinator get collapsed into one
1183+
if loadWait == nil { // all coordinators were cached
1184+
hasLoadedBrokers = cl.waitCoordinatorLoad(ctx, typ, load2key, !hasLoadedBrokers, toRequest, m)
1185+
continue
11761186
}
1177-
defer func() {
1178-
// If our load fails, we avoid caching the coordinator,
1179-
// but only if something else has not already replaced
1180-
// our pointer.
1181-
if c.err != nil {
1182-
cl.deleteStaleCoordinatorIfEqual(key, c)
1187+
1188+
key2load := make(map[string]*coordinatorLoad)
1189+
req := kmsg.NewPtrFindCoordinatorRequest()
1190+
req.CoordinatorType = typ
1191+
for c, keys := range load2key {
1192+
if c.loadWait == loadWait { // if this is our wait, this is ours to request
1193+
req.CoordinatorKeys = append(req.CoordinatorKeys, keys...)
1194+
for _, key := range keys {
1195+
key2load[key] = c
1196+
delete(toRequest, key)
1197+
}
11831198
}
1184-
close(c.done)
1185-
}()
1186-
cl.coordinators[key] = c
1187-
}
1188-
cl.coordinatorsMu.Unlock()
1199+
}
11891200

1190-
if ok {
1191-
<-c.done
1192-
if c.err != nil {
1193-
return nil, c.err
1201+
shards := cl.RequestSharded(ctx, req)
1202+
1203+
for _, shard := range shards {
1204+
if shard.Err != nil {
1205+
req := shard.Req.(*kmsg.FindCoordinatorRequest)
1206+
for _, key := range req.CoordinatorKeys {
1207+
c, ok := key2load[key]
1208+
if ok {
1209+
c.err = shard.Err
1210+
}
1211+
}
1212+
} else {
1213+
resp, _ := shard.Resp.(*kmsg.FindCoordinatorResponse)
1214+
for _, rc := range resp.Coordinators {
1215+
c, ok := key2load[rc.Key]
1216+
if ok {
1217+
c.err = kerr.ErrorForCode(rc.ErrorCode)
1218+
c.node = resp.NodeID
1219+
}
1220+
}
1221+
}
11941222
}
11951223

1196-
// If brokerOrErr returns an error, then our cached coordinator
1197-
// is using metadata that has updated and removed knowledge of
1198-
// that coordinator. We delete the stale coordinator here and
1199-
// retry once. The retry will force a coordinator reload, and
1200-
// everything will be fresh. Any errors after that we keep.
1201-
b, err := cl.brokerOrErr(nil, c.node, &errUnknownCoordinator{c.node, key})
1202-
if err != nil && !restarted {
1203-
restarted = true
1204-
cl.deleteStaleCoordinatorIfEqual(key, c)
1205-
goto start
1224+
// For anything we loaded, if it has a load failure (including
1225+
// not being replied to), we remove the key from the cache. We
1226+
// do not want to cache erroring values.
1227+
//
1228+
// We range key2load, which contains only coordinators we are
1229+
// responsible for loading.
1230+
cl.coordinatorsMu.Lock()
1231+
for key, c := range key2load {
1232+
if c.err != nil {
1233+
ck := coordinatorKey{key, typ}
1234+
if loading, ok := cl.coordinators[ck]; ok && loading == c {
1235+
delete(cl.coordinators, ck)
1236+
}
1237+
}
12061238
}
1207-
return b, err
1208-
}
1239+
cl.coordinatorsMu.Unlock()
12091240

1210-
var resp *kmsg.FindCoordinatorResponse
1211-
req := kmsg.NewPtrFindCoordinatorRequest()
1212-
req.CoordinatorKey = key.name
1213-
req.CoordinatorType = key.typ
1214-
_, resp, c.err = cl.findCoordinator(ctx, req)
1215-
if c.err != nil {
1216-
return nil, c.err
1217-
}
1218-
if c.err = kerr.ErrorForCode(resp.ErrorCode); c.err != nil {
1219-
return nil, c.err
1241+
close(loadWait)
1242+
hasLoadedBrokers = cl.waitCoordinatorLoad(ctx, typ, load2key, !hasLoadedBrokers, toRequest, m)
12201243
}
1221-
c.node = resp.NodeID
1244+
return m
1245+
}
12221246

1223-
var b *broker
1224-
b, c.err = cl.brokerOrErr(ctx, c.node, &errUnknownCoordinator{c.node, key})
1225-
return b, c.err
1247+
// After some prep work, we wait for coordinators to load. We update toRequest
1248+
// values with true if the caller should bypass cache and re-load these
1249+
// coordinators.
1250+
//
1251+
// This returns if we load brokers, and populates m with results.
1252+
func (cl *Client) waitCoordinatorLoad(ctx context.Context, typ int8, load2key map[*coordinatorLoad][]string, shouldLoadBrokers bool, toRequest map[string]bool, m map[string]brokerOrErr) bool {
1253+
var loadedBrokers bool
1254+
for c, keys := range load2key {
1255+
<-c.loadWait
1256+
for _, key := range keys {
1257+
if c.err != nil {
1258+
delete(toRequest, key)
1259+
m[key] = brokerOrErr{nil, c.err}
1260+
continue
1261+
}
1262+
1263+
var brokerCtx context.Context
1264+
if shouldLoadBrokers && !loadedBrokers {
1265+
brokerCtx = ctx
1266+
loadedBrokers = true
1267+
}
1268+
1269+
b, err := cl.brokerOrErr(brokerCtx, c.node, &errUnknownCoordinator{c.node, coordinatorKey{key, typ}})
1270+
if err != nil {
1271+
if _, exists := toRequest[key]; exists {
1272+
toRequest[key] = true
1273+
continue
1274+
}
1275+
// If the key does not exist, we just loaded this
1276+
// coordinator and also the brokers. We do not
1277+
// re-request.
1278+
}
1279+
delete(toRequest, key)
1280+
m[key] = brokerOrErr{b, err}
1281+
}
1282+
}
1283+
return loadedBrokers
12261284
}
12271285

12281286
func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error) bool {
@@ -1247,37 +1305,6 @@ type brokerOrErr struct {
12471305
err error
12481306
}
12491307

1250-
// loadCoordinators does a concurrent load of many coordinators.
1251-
func (cl *Client) loadCoordinators(typ int8, names ...string) map[string]brokerOrErr {
1252-
uniq := make(map[string]struct{})
1253-
for _, name := range names {
1254-
uniq[name] = struct{}{}
1255-
}
1256-
1257-
var mu sync.Mutex
1258-
m := make(map[string]brokerOrErr)
1259-
1260-
var wg sync.WaitGroup
1261-
for uniqName := range uniq {
1262-
myName := uniqName
1263-
wg.Add(1)
1264-
go func() {
1265-
defer wg.Done()
1266-
coordinator, err := cl.loadCoordinator(cl.ctx, coordinatorKey{
1267-
name: myName,
1268-
typ: typ,
1269-
})
1270-
1271-
mu.Lock()
1272-
defer mu.Unlock()
1273-
m[myName] = brokerOrErr{coordinator, err}
1274-
}()
1275-
}
1276-
wg.Wait()
1277-
1278-
return m
1279-
}
1280-
12811308
func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) ResponseShard {
12821309
// Loading a controller can perform some wait; we accept that and do
12831310
// not account for the retries or the time to load the controller as
@@ -1411,10 +1438,7 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request) Re
14111438
// coordinator is deleted.
14121439
func (cl *Client) handleCoordinatorReqSimple(ctx context.Context, typ int8, name string, req kmsg.Request) ResponseShard {
14131440
coordinator, resp, err := cl.handleReqWithCoordinator(ctx, func() (*broker, error) {
1414-
return cl.loadCoordinator(ctx, coordinatorKey{
1415-
name: name,
1416-
typ: typ,
1417-
})
1441+
return cl.loadCoordinator(ctx, typ, name)
14181442
}, typ, name, req)
14191443
return shard(coordinator, req, resp, err)
14201444
}
@@ -1597,6 +1621,14 @@ type issueShard struct {
15971621

15981622
// sharder splits a request.
15991623
type sharder interface {
1624+
// If a request originally was not batched, then the protocol switched
1625+
// to being batched, we always try batched first then fallback.
1626+
//
1627+
// Requests that make this switch should always return pinReq requests,
1628+
// and we must unpack the pinReq to return to end users / use
1629+
// internally.
1630+
unpackPinReq() bool
1631+
16001632
// shard splits a request and returns the requests to issue tied to the
16011633
// brokers to issue the requests to. This can return an error if there
16021634
// is some pre-loading that needs to happen. If an error is returned,
@@ -1605,7 +1637,10 @@ type sharder interface {
16051637
// Due to sharded requests not being retriable if a response is
16061638
// received, to avoid stale coordinator errors, this function should
16071639
// not use any previously cached metadata.
1608-
shard(context.Context, kmsg.Request) ([]issueShard, bool, error)
1640+
//
1641+
// This takes the last error if the request is being retried, which is
1642+
// currently only useful for errBrokerTooOld.
1643+
shard(context.Context, kmsg.Request, error) ([]issueShard, bool, error)
16091644

16101645
// onResp is called on a successful response to investigate the
16111646
// response and potentially perform cleanup, and potentially returns an
@@ -1628,6 +1663,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
16281663
sharder = &listOffsetsSharder{cl}
16291664
case *kmsg.OffsetFetchRequest:
16301665
sharder = &offsetFetchSharder{cl}
1666+
case *kmsg.FindCoordinatorRequest:
1667+
sharder = &findCoordinatorSharder{cl}
16311668
case *kmsg.DescribeGroupsRequest:
16321669
sharder = &describeGroupsSharder{cl}
16331670
case *kmsg.ListGroupsRequest:
@@ -1660,8 +1697,9 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
16601697
// again). reqTry tracks how many total tries a request piece has had;
16611698
// we quit at either the max configured tries or max configured time.
16621699
type reqTry struct {
1663-
tries int
1664-
req kmsg.Request
1700+
tries int
1701+
req kmsg.Request
1702+
lastErr error
16651703
}
16661704

16671705
var (
@@ -1687,8 +1725,9 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
16871725
// issue is called to progressively split and issue requests.
16881726
//
16891727
// This recursively calls itself if a request fails and can be retried.
1728+
// We avoid stack problems because this calls itself in a goroutine.
16901729
issue = func(try reqTry) {
1691-
issues, reshardable, err := sharder.shard(ctx, try.req)
1730+
issues, reshardable, err := sharder.shard(ctx, try.req, try.lastErr)
16921731
if err != nil {
16931732
l.Log(LogLevelDebug, "unable to shard request", "previous_tries", try.tries, "err", err)
16941733
addShard(shard(nil, try.req, nil, err)) // failure to shard means data loading failed; this request is failed
@@ -1722,9 +1761,13 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
17221761

17231762
for i := range issues {
17241763
myIssue := issues[i]
1764+
myUnderlyingReq := myIssue.req
1765+
if sharder.unpackPinReq() {
1766+
myUnderlyingReq = myIssue.req.(*pinReq).Request
1767+
}
17251768

17261769
if myIssue.err != nil {
1727-
addShard(shard(nil, myIssue.req, nil, myIssue.err))
1770+
addShard(shard(nil, myUnderlyingReq, nil, myIssue.err))
17281771
continue
17291772
}
17301773

@@ -1741,20 +1784,22 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
17411784
broker, err = cl.brokerOrErr(ctx, myIssue.broker, errUnknownBroker)
17421785
}
17431786
if err != nil {
1744-
addShard(shard(nil, myIssue.req, nil, err)) // failure to load a broker is a failure to issue a request
1787+
addShard(shard(nil, myUnderlyingReq, nil, err)) // failure to load a broker is a failure to issue a request
17451788
return
17461789
}
17471790

17481791
resp, err := broker.waitResp(ctx, myIssue.req)
17491792
if err == nil {
1750-
err = sharder.onResp(myIssue.req, resp) // perform some potential cleanup, and potentially receive an error to retry
1793+
err = sharder.onResp(myUnderlyingReq, resp) // perform some potential cleanup, and potentially receive an error to retry
17511794
}
17521795

17531796
// If we failed to issue the request, we *maybe* will retry.
17541797
// We could have failed to even issue the request or receive
17551798
// a response, which is retriable.
17561799
backoff := cl.cfg.retryBackoff(tries)
1757-
if err != nil && (retryTimeout == 0 || time.Now().Add(backoff).Sub(start) < retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff) {
1800+
if err != nil &&
1801+
(retryTimeout == 0 || time.Now().Add(backoff).Sub(start) < retryTimeout) &&
1802+
(reshardable && sharder.unpackPinReq() && errors.Is(err, errBrokerTooOld) || cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff)) {
17581803
// Non-reshardable re-requests just jump back to the
17591804
// top where the broker is loaded. This is the case on
17601805
// requests where the original request is split to
@@ -1764,16 +1809,16 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
17641809
goto start
17651810
}
17661811
l.Log(LogLevelDebug, "sharded request failed, resharding and reissuing", "time_since_start", time.Since(start), "tries", try.tries, "err", err)
1767-
issue(reqTry{tries, myIssue.req})
1812+
issue(reqTry{tries, myUnderlyingReq, err})
17681813
return
17691814
}
17701815

1771-
addShard(shard(broker, myIssue.req, resp, err)) // the error was not retriable
1816+
addShard(shard(broker, myUnderlyingReq, resp, err)) // the error was not retriable
17721817
}()
17731818
}
17741819
}
17751820

1776-
issue(reqTry{0, req})
1821+
issue(reqTry{0, req, nil})
17771822
wg.Wait()
17781823

17791824
return shards, sharder.merge
@@ -2031,7 +2076,9 @@ func (l *unknownErrShards) collect(mkreq, mergeParts interface{}) []issueShard {
20312076
// handles sharding ListOffsetsRequest
20322077
type listOffsetsSharder struct{ *Client }
20332078

2034-
func (cl *listOffsetsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
2079+
func (*listOffsetsSharder) unpackPinReq() bool { return false }
2080+
2081+
func (cl *listOffsetsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
20352082
req := kreq.(*kmsg.ListOffsetsRequest)
20362083

20372084
// For listing offsets, we need the broker leader for each partition we
@@ -2157,6 +2204,8 @@ func (*listOffsetsSharder) merge(sresps []ResponseShard) (kmsg.Response, error)
21572204
// handles sharding OffsetFetchRequest
21582205
type offsetFetchSharder struct{ *Client }
21592206

2207+
func (*offsetFetchSharder) unpackPinReq() bool { return true } // batch first, single group fallback
2208+
21602209
func offsetFetchReqToGroup(req *kmsg.OffsetFetchRequest) kmsg.OffsetFetchRequestGroup {
21612210
g := kmsg.NewOffsetFetchRequestGroup()
21622211
g.Group = req.Group
@@ -2169,6 +2218,19 @@ func offsetFetchReqToGroup(req *kmsg.OffsetFetchRequest) kmsg.OffsetFetchRequest
21692218
return g
21702219
}
21712220

2221+
func offsetFetchGroupToReq(requireStable bool, group kmsg.OffsetFetchRequestGroup) *kmsg.OffsetFetchRequest {
2222+
req := kmsg.NewPtrOffsetFetchRequest()
2223+
req.RequireStable = requireStable
2224+
req.Group = group.Group
2225+
for _, topic := range group.Topics {
2226+
reqTopic := kmsg.NewOffsetFetchRequestTopic()
2227+
reqTopic.Topic = topic.Topic
2228+
reqTopic.Partitions = topic.Partitions
2229+
req.Topics = append(req.Topics, reqTopic)
2230+
}
2231+
return req
2232+
}
2233+
21722234
func offsetFetchRespToGroup(req *kmsg.OffsetFetchRequest, resp *kmsg.OffsetFetchResponse) kmsg.OffsetFetchResponseGroup {
21732235
g := kmsg.NewOffsetFetchResponseGroup()
21742236
g.Group = req.Group
@@ -2209,46 +2271,26 @@ func offsetFetchRespGroupIntoResp(g kmsg.OffsetFetchResponseGroup, into *kmsg.Of
22092271
}
22102272
}
22112273

2212-
func (cl *offsetFetchSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
2274+
func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, lastErr error) ([]issueShard, bool, error) {
22132275
req := kreq.(*kmsg.OffsetFetchRequest)
22142276

2215-
groups := make([]string, 0, len(req.Groups)+1)
2216-
if len(req.Groups) == 0 { // v0-v7
2217-
groups = append(groups, req.Group)
2218-
}
2219-
for i := range req.Groups { // v8+
2220-
groups = append(groups, req.Groups[i].Group)
2221-
}
2222-
2223-
coordinators := cl.loadCoordinators(coordinatorTypeGroup, groups...)
2277+
// We always try batching and only split at the end if lastErr
2278+
// indicates too old. We convert to batching immediately.
2279+
dup := *req
2280+
req = &dup
22242281

2225-
// If there is only the top level group, then we simply return our
2226-
// request mapped to its specific broker. For forward compatibility, we
2227-
// also embed the top level request into the Groups list: this allows
2228-
// operators of old request versions (v0-v7) to issue a v8 request
2229-
// appropriately. On response, if the length of groups is 1, we merge
2230-
// the first item back to the top level.
22312282
if len(req.Groups) == 0 {
2232-
berr := coordinators[req.Group]
2233-
if berr.err != nil {
2234-
return []issueShard{{ //nolint:nilerr // error is returned in the struct
2235-
req: req,
2236-
err: berr.err,
2237-
}}, false, nil // not reshardable, because this is an error
2283+
req.Groups = append(req.Groups, offsetFetchReqToGroup(req))
2284+
}
2285+
groups := make([]string, 0, len(req.Groups))
2286+
for i := range req.Groups {
2287+
if g := req.Groups[i].Group; len(g) > 0 {
2288+
groups = append(groups, req.Groups[i].Group)
22382289
}
2239-
2240-
dup := *req
2241-
brokerReq := &dup
2242-
brokerReq.Groups = append(brokerReq.Groups, offsetFetchReqToGroup(req))
2243-
2244-
return []issueShard{{
2245-
req: brokerReq,
2246-
broker: berr.b.meta.NodeID,
2247-
}}, false, nil // reshardable to reload correct coordinator
22482290
}
22492291

2250-
// v8+ behavior: we have multiple groups.
2251-
//
2292+
coordinators := cl.loadCoordinators(ctx, coordinatorTypeGroup, groups...)
2293+
22522294
// Loading coordinators can have each group fail with its unique error,
22532295
// or with a kerr.Error that can be merged. Unique errors get their own
22542296
// failure shard, while kerr.Error's get merged.
@@ -2287,12 +2329,24 @@ func (cl *offsetFetchSharder) shard(_ context.Context, kreq kmsg.Request) ([]iss
22872329
}
22882330
}
22892331

2332+
splitReq := errors.Is(lastErr, errBrokerTooOld)
2333+
22902334
var issues []issueShard
22912335
for id, req := range brokerReqs {
2292-
issues = append(issues, issueShard{
2293-
req: req,
2294-
broker: id,
2295-
})
2336+
if splitReq {
2337+
for _, group := range req.Groups {
2338+
req := offsetFetchGroupToReq(req.RequireStable, group)
2339+
issues = append(issues, issueShard{
2340+
req: &pinReq{Request: req, pinMax: true, max: 7},
2341+
broker: id,
2342+
})
2343+
}
2344+
} else {
2345+
issues = append(issues, issueShard{
2346+
req: &pinReq{Request: req, pinMin: true, min: 8},
2347+
broker: id,
2348+
})
2349+
}
22962350
}
22972351
for _, unkerr := range unkerrs {
22982352
issues = append(issues, issueShard{
@@ -2311,7 +2365,7 @@ func (cl *offsetFetchSharder) shard(_ context.Context, kreq kmsg.Request) ([]iss
23112365
}
23122366

23132367
func (cl *offsetFetchSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error {
2314-
req := kreq.(*kmsg.OffsetFetchRequest)
2368+
req := kreq.(*kmsg.OffsetFetchRequest) // we always issue pinned requests
23152369
resp := kresp.(*kmsg.OffsetFetchResponse)
23162370

23172371
switch len(resp.Groups) {
@@ -2365,13 +2419,106 @@ func (*offsetFetchSharder) merge(sresps []ResponseShard) (kmsg.Response, error)
23652419
})
23662420
}
23672421

2422+
// handles sharding FindCoordinatorRequest
2423+
type findCoordinatorSharder struct{ *Client }
2424+
2425+
func (*findCoordinatorSharder) unpackPinReq() bool { return true } // batch first, single key fallback
2426+
2427+
func findCoordinatorRespCoordinatorIntoResp(c kmsg.FindCoordinatorResponseCoordinator, into *kmsg.FindCoordinatorResponse) {
2428+
into.NodeID = c.NodeID
2429+
into.Host = c.Host
2430+
into.Port = c.Port
2431+
into.ErrorCode = c.ErrorCode
2432+
into.ErrorMessage = c.ErrorMessage
2433+
}
2434+
2435+
func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastErr error) ([]issueShard, bool, error) {
2436+
req := kreq.(*kmsg.FindCoordinatorRequest)
2437+
2438+
// We always try batching and only split at the end if lastErr
2439+
// indicates too old. We convert to batching immediately.
2440+
dup := *req
2441+
req = &dup
2442+
2443+
uniq := make(map[string]struct{}, len(req.CoordinatorKeys))
2444+
uniq[req.CoordinatorKey] = struct{}{}
2445+
for _, key := range req.CoordinatorKeys {
2446+
uniq[key] = struct{}{}
2447+
}
2448+
req.CoordinatorKeys = req.CoordinatorKeys[:0]
2449+
for key := range uniq {
2450+
if len(key) > 0 {
2451+
req.CoordinatorKeys = append(req.CoordinatorKeys, key)
2452+
}
2453+
}
2454+
2455+
splitReq := errors.Is(lastErr, errBrokerTooOld)
2456+
if !splitReq {
2457+
return []issueShard{{
2458+
req: &pinReq{Request: req, pinMin: true, min: 4},
2459+
any: true,
2460+
}}, true, nil // this is "reshardable", in that we will split the request next
2461+
}
2462+
2463+
var issues []issueShard
2464+
for _, key := range req.CoordinatorKeys {
2465+
sreq := kmsg.NewPtrFindCoordinatorRequest()
2466+
sreq.CoordinatorType = req.CoordinatorType
2467+
sreq.CoordinatorKey = key
2468+
issues = append(issues, issueShard{
2469+
req: &pinReq{Request: sreq, pinMax: true, max: 3},
2470+
any: true,
2471+
})
2472+
}
2473+
return issues, false, nil // not reshardable
2474+
}
2475+
2476+
func (*findCoordinatorSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error {
2477+
req := kreq.(*kmsg.FindCoordinatorRequest) // we always issue pinned requests
2478+
resp := kresp.(*kmsg.FindCoordinatorResponse)
2479+
2480+
switch len(resp.Coordinators) {
2481+
case 0:
2482+
// Convert v3 and prior to v4+
2483+
rc := kmsg.NewFindCoordinatorResponseCoordinator()
2484+
rc.Key = req.CoordinatorKey
2485+
rc.NodeID = resp.NodeID
2486+
rc.Host = resp.Host
2487+
rc.Port = resp.Port
2488+
rc.ErrorCode = resp.ErrorCode
2489+
rc.ErrorMessage = resp.ErrorMessage
2490+
resp.Coordinators = append(resp.Coordinators, rc)
2491+
case 1:
2492+
// Convert v4 to v3 and prior
2493+
findCoordinatorRespCoordinatorIntoResp(resp.Coordinators[0], resp)
2494+
}
2495+
2496+
return nil
2497+
}
2498+
2499+
func (*findCoordinatorSharder) merge(sresps []ResponseShard) (kmsg.Response, error) {
2500+
merged := kmsg.NewPtrFindCoordinatorResponse()
2501+
return merged, firstErrMerger(sresps, func(kresp kmsg.Response) {
2502+
resp := kresp.(*kmsg.FindCoordinatorResponse)
2503+
merged.Version = resp.Version
2504+
merged.ThrottleMillis = resp.ThrottleMillis
2505+
merged.Coordinators = append(merged.Coordinators, resp.Coordinators...)
2506+
2507+
if len(resp.Coordinators) == 1 {
2508+
findCoordinatorRespCoordinatorIntoResp(resp.Coordinators[0], merged)
2509+
}
2510+
})
2511+
}
2512+
23682513
// handles sharding DescribeGroupsRequest
23692514
type describeGroupsSharder struct{ *Client }
23702515

2371-
func (cl *describeGroupsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
2516+
func (*describeGroupsSharder) unpackPinReq() bool { return false }
2517+
2518+
func (cl *describeGroupsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
23722519
req := kreq.(*kmsg.DescribeGroupsRequest)
23732520

2374-
coordinators := cl.loadCoordinators(coordinatorTypeGroup, req.Groups...)
2521+
coordinators := cl.loadCoordinators(ctx, coordinatorTypeGroup, req.Groups...)
23752522
type unkerr struct {
23762523
err error
23772524
group string
@@ -2455,7 +2602,9 @@ func (*describeGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, erro
24552602
// handles sharding ListGroupsRequest
24562603
type listGroupsSharder struct{ *Client }
24572604

2458-
func (cl *listGroupsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
2605+
func (*listGroupsSharder) unpackPinReq() bool { return false }
2606+
2607+
func (cl *listGroupsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
24592608
req := kreq.(*kmsg.ListGroupsRequest)
24602609
return cl.allBrokersShardedReq(ctx, func() kmsg.Request {
24612610
dup := *req
@@ -2484,7 +2633,9 @@ func (*listGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) {
24842633
// handle sharding DeleteRecordsRequest
24852634
type deleteRecordsSharder struct{ *Client }
24862635

2487-
func (cl *deleteRecordsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
2636+
func (*deleteRecordsSharder) unpackPinReq() bool { return false }
2637+
2638+
func (cl *deleteRecordsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
24882639
req := kreq.(*kmsg.DeleteRecordsRequest)
24892640

24902641
var need []string
@@ -2601,7 +2752,9 @@ func (*deleteRecordsSharder) merge(sresps []ResponseShard) (kmsg.Response, error
26012752
// handle sharding OffsetForLeaderEpochRequest
26022753
type offsetForLeaderEpochSharder struct{ *Client }
26032754

2604-
func (cl *offsetForLeaderEpochSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
2755+
func (*offsetForLeaderEpochSharder) unpackPinReq() bool { return false }
2756+
2757+
func (cl *offsetForLeaderEpochSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
26052758
req := kreq.(*kmsg.OffsetForLeaderEpochRequest)
26062759

26072760
var need []string
@@ -2718,7 +2871,9 @@ func (*offsetForLeaderEpochSharder) merge(sresps []ResponseShard) (kmsg.Response
27182871
// handle sharding DescribeConfigsRequest
27192872
type describeConfigsSharder struct{ *Client }
27202873

2721-
func (*describeConfigsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
2874+
func (*describeConfigsSharder) unpackPinReq() bool { return false }
2875+
2876+
func (*describeConfigsSharder) shard(_ context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
27222877
req := kreq.(*kmsg.DescribeConfigsRequest)
27232878

27242879
brokerReqs := make(map[int32][]kmsg.DescribeConfigsRequestResource)
@@ -2783,7 +2938,9 @@ func (*describeConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, err
27832938
// handle sharding AlterConfigsRequest
27842939
type alterConfigsSharder struct{ *Client }
27852940

2786-
func (*alterConfigsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
2941+
func (*alterConfigsSharder) unpackPinReq() bool { return false }
2942+
2943+
func (*alterConfigsSharder) shard(_ context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
27872944
req := kreq.(*kmsg.AlterConfigsRequest)
27882945

27892946
brokerReqs := make(map[int32][]kmsg.AlterConfigsRequestResource)
@@ -2846,7 +3003,9 @@ func (*alterConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, error)
28463003
// handles sharding AlterReplicaLogDirsRequest
28473004
type alterReplicaLogDirsSharder struct{ *Client }
28483005

2849-
func (cl *alterReplicaLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
3006+
func (*alterReplicaLogDirsSharder) unpackPinReq() bool { return false }
3007+
3008+
func (cl *alterReplicaLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
28503009
req := kreq.(*kmsg.AlterReplicaLogDirsRequest)
28513010

28523011
needMap := make(map[string]struct{})
@@ -2991,7 +3150,9 @@ func (*alterReplicaLogDirsSharder) merge(sresps []ResponseShard) (kmsg.Response,
29913150
// handles sharding DescribeLogDirsRequest
29923151
type describeLogDirsSharder struct{ *Client }
29933152

2994-
func (cl *describeLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
3153+
func (*describeLogDirsSharder) unpackPinReq() bool { return false }
3154+
3155+
func (cl *describeLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
29953156
req := kreq.(*kmsg.DescribeLogDirsRequest)
29963157

29973158
// If req.Topics is nil, the request is to describe all logdirs. Thus,
@@ -3106,10 +3267,12 @@ func (*describeLogDirsSharder) merge(sresps []ResponseShard) (kmsg.Response, err
31063267
// handles sharding DeleteGroupsRequest
31073268
type deleteGroupsSharder struct{ *Client }
31083269

3109-
func (cl *deleteGroupsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
3270+
func (*deleteGroupsSharder) unpackPinReq() bool { return false }
3271+
3272+
func (cl *deleteGroupsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
31103273
req := kreq.(*kmsg.DeleteGroupsRequest)
31113274

3112-
coordinators := cl.loadCoordinators(coordinatorTypeGroup, req.Groups...)
3275+
coordinators := cl.loadCoordinators(ctx, coordinatorTypeGroup, req.Groups...)
31133276
type unkerr struct {
31143277
err error
31153278
group string
@@ -3192,7 +3355,9 @@ func (*deleteGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, error)
31923355
// handle sharding IncrementalAlterConfigsRequest
31933356
type incrementalAlterConfigsSharder struct{ *Client }
31943357

3195-
func (*incrementalAlterConfigsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
3358+
func (*incrementalAlterConfigsSharder) unpackPinReq() bool { return false }
3359+
3360+
func (*incrementalAlterConfigsSharder) shard(_ context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
31963361
req := kreq.(*kmsg.IncrementalAlterConfigsRequest)
31973362

31983363
brokerReqs := make(map[int32][]kmsg.IncrementalAlterConfigsRequestResource)
@@ -3255,7 +3420,9 @@ func (*incrementalAlterConfigsSharder) merge(sresps []ResponseShard) (kmsg.Respo
32553420
// handle sharding DescribeProducersRequest
32563421
type describeProducersSharder struct{ *Client }
32573422

3258-
func (cl *describeProducersSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
3423+
func (*describeProducersSharder) unpackPinReq() bool { return false }
3424+
3425+
func (cl *describeProducersSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
32593426
req := kreq.(*kmsg.DescribeProducersRequest)
32603427

32613428
var need []string
@@ -3363,10 +3530,12 @@ func (*describeProducersSharder) merge(sresps []ResponseShard) (kmsg.Response, e
33633530
// handles sharding DescribeTransactionsRequest
33643531
type describeTransactionsSharder struct{ *Client }
33653532

3366-
func (cl *describeTransactionsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
3533+
func (*describeTransactionsSharder) unpackPinReq() bool { return false }
3534+
3535+
func (cl *describeTransactionsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
33673536
req := kreq.(*kmsg.DescribeTransactionsRequest)
33683537

3369-
coordinators := cl.loadCoordinators(coordinatorTypeTxn, req.TransactionalIDs...)
3538+
coordinators := cl.loadCoordinators(ctx, coordinatorTypeTxn, req.TransactionalIDs...)
33703539
type unkerr struct {
33713540
err error
33723541
txnID string
@@ -3449,7 +3618,9 @@ func (*describeTransactionsSharder) merge(sresps []ResponseShard) (kmsg.Response
34493618
// handles sharding ListTransactionsRequest
34503619
type listTransactionsSharder struct{ *Client }
34513620

3452-
func (cl *listTransactionsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
3621+
func (*listTransactionsSharder) unpackPinReq() bool { return false }
3622+
3623+
func (cl *listTransactionsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
34533624
req := kreq.(*kmsg.ListTransactionsRequest)
34543625
return cl.allBrokersShardedReq(ctx, func() kmsg.Request {
34553626
dup := *req

0 commit comments

Comments
 (0)
Please sign in to comment.