Skip to content

Commit 7d050fc

Browse files
committedDec 21, 2023
kgo: do not cancel FindCoordinator if the parent context cancels
Some load testing in Redpanda showed a failure where consuming quit unexpectedly and unrecoverably. The sequence of events is: * if OffsetCommit is issued just before Heartbeat * and the group needs to be loaded so FindCoordinator is triggered, * and OffsetCommit happens again, canceling the prior commit's context Then, * FindCoordinator would cancel * Heartbeat, which is waiting on the same load, would fail with context.Canceled * This error is seen as a group leave error * The group management logic would quit entirely. Now, the context used for FindCoordinator is the client context, which is only closed on client close. This is also better anyway -- if two requests are waiting for the same coordinator load, we don't want the first request canceling to error the second request. If all requests cancel and we have a stray FindCoordinator in flight, that's ok too, because well, worst case we'll just eventually have a little bit of extra data cached that is likely needed in the future anyway. Closes redpanda-data/redpanda#15131
1 parent b8b065d commit 7d050fc

File tree

1 file changed

+42
-5
lines changed

1 file changed

+42
-5
lines changed
 

‎pkg/kgo/client.go

+42-5
Original file line numberDiff line numberDiff line change
@@ -1505,6 +1505,31 @@ func (cl *Client) loadCoordinator(ctx context.Context, typ int8, key string) (*b
15051505
}
15061506

15071507
func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string) map[string]brokerOrErr {
1508+
mch := make(chan map[string]brokerOrErr, 1)
1509+
go func() { mch <- cl.doLoadCoordinators(ctx, typ, keys...) }()
1510+
select {
1511+
case m := <-mch:
1512+
return m
1513+
case <-ctx.Done():
1514+
m := make(map[string]brokerOrErr, len(keys))
1515+
for _, k := range keys {
1516+
m[k] = brokerOrErr{nil, ctx.Err()}
1517+
}
1518+
return m
1519+
}
1520+
}
1521+
1522+
// doLoadCoordinators uses the caller context to cancel loading metadata
1523+
// (brokerOrErr), but we use the client context to actually issue the request.
1524+
// There should be only one direct call to doLoadCoordinators, just above in
1525+
// loadCoordinator. It is possible for two requests to be loading the same
1526+
// coordinator (in fact, that's the point of this function -- collapse these
1527+
// requests). We do not want the first request canceling it's context to cause
1528+
// errors for the second request.
1529+
//
1530+
// It is ok to leave FindCoordinator running even if the caller quits. Worst
1531+
// case, we just cache things for some time in the future; yay.
1532+
func (cl *Client) doLoadCoordinators(ctx context.Context, typ int8, keys ...string) map[string]brokerOrErr {
15081533
m := make(map[string]brokerOrErr, len(keys))
15091534
if len(keys) == 0 {
15101535
return m
@@ -1575,7 +1600,12 @@ func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string
15751600
}
15761601
}
15771602

1578-
shards := cl.RequestSharded(ctx, req)
1603+
cl.cfg.logger.Log(LogLevelDebug, "prepared to issue find coordinator request",
1604+
"coordinator_type", typ,
1605+
"coordinator_keys", req.CoordinatorKeys,
1606+
)
1607+
1608+
shards := cl.RequestSharded(cl.ctx, req)
15791609

15801610
for _, shard := range shards {
15811611
if shard.Err != nil {
@@ -1674,10 +1704,17 @@ func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error)
16741704
func (cl *Client) deleteStaleCoordinator(name string, typ int8) {
16751705
cl.coordinatorsMu.Lock()
16761706
defer cl.coordinatorsMu.Unlock()
1677-
delete(cl.coordinators, coordinatorKey{
1678-
name: name,
1679-
typ: typ,
1680-
})
1707+
k := coordinatorKey{name, typ}
1708+
v := cl.coordinators[k]
1709+
if v == nil {
1710+
return
1711+
}
1712+
select {
1713+
case <-v.loadWait:
1714+
delete(cl.coordinators, k)
1715+
default:
1716+
// We are actively reloading this coordinator.
1717+
}
16811718
}
16821719

16831720
type brokerOrErr struct {

0 commit comments

Comments
 (0)
Please sign in to comment.