Skip to content

Commit

Permalink
Merge pull request #650 from twmb/find_coordinator_ctx
Browse files Browse the repository at this point in the history
kgo: do not cancel FindCoordinator if the parent context cancels
  • Loading branch information
twmb committed Dec 21, 2023
2 parents 2907ba9 + 7d050fc commit d269dad
Showing 1 changed file with 42 additions and 5 deletions.
47 changes: 42 additions & 5 deletions pkg/kgo/client.go
Expand Up @@ -1505,6 +1505,31 @@ func (cl *Client) loadCoordinator(ctx context.Context, typ int8, key string) (*b
}

func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string) map[string]brokerOrErr {
mch := make(chan map[string]brokerOrErr, 1)
go func() { mch <- cl.doLoadCoordinators(ctx, typ, keys...) }()
select {
case m := <-mch:
return m
case <-ctx.Done():
m := make(map[string]brokerOrErr, len(keys))
for _, k := range keys {
m[k] = brokerOrErr{nil, ctx.Err()}
}
return m
}
}

// doLoadCoordinators uses the caller context to cancel loading metadata
// (brokerOrErr), but we use the client context to actually issue the request.
// There should be only one direct call to doLoadCoordinators, just above in
// loadCoordinator. It is possible for two requests to be loading the same
// coordinator (in fact, that's the point of this function -- collapse these
// requests). We do not want the first request canceling it's context to cause
// errors for the second request.
//
// It is ok to leave FindCoordinator running even if the caller quits. Worst
// case, we just cache things for some time in the future; yay.
func (cl *Client) doLoadCoordinators(ctx context.Context, typ int8, keys ...string) map[string]brokerOrErr {
m := make(map[string]brokerOrErr, len(keys))
if len(keys) == 0 {
return m
Expand Down Expand Up @@ -1575,7 +1600,12 @@ func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string
}
}

shards := cl.RequestSharded(ctx, req)
cl.cfg.logger.Log(LogLevelDebug, "prepared to issue find coordinator request",
"coordinator_type", typ,
"coordinator_keys", req.CoordinatorKeys,
)

shards := cl.RequestSharded(cl.ctx, req)

for _, shard := range shards {
if shard.Err != nil {
Expand Down Expand Up @@ -1674,10 +1704,17 @@ func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error)
func (cl *Client) deleteStaleCoordinator(name string, typ int8) {
cl.coordinatorsMu.Lock()
defer cl.coordinatorsMu.Unlock()
delete(cl.coordinators, coordinatorKey{
name: name,
typ: typ,
})
k := coordinatorKey{name, typ}
v := cl.coordinators[k]
if v == nil {
return
}
select {
case <-v.loadWait:
delete(cl.coordinators, k)
default:
// We are actively reloading this coordinator.
}
}

type brokerOrErr struct {
Expand Down

0 comments on commit d269dad

Please sign in to comment.