diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 07bb9adc..3f9fe29d 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -1505,6 +1505,31 @@ func (cl *Client) loadCoordinator(ctx context.Context, typ int8, key string) (*b } func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string) map[string]brokerOrErr { + mch := make(chan map[string]brokerOrErr, 1) + go func() { mch <- cl.doLoadCoordinators(ctx, typ, keys...) }() + select { + case m := <-mch: + return m + case <-ctx.Done(): + m := make(map[string]brokerOrErr, len(keys)) + for _, k := range keys { + m[k] = brokerOrErr{nil, ctx.Err()} + } + return m + } +} + +// doLoadCoordinators uses the caller context to cancel loading metadata +// (brokerOrErr), but we use the client context to actually issue the request. +// There should be only one direct call to doLoadCoordinators, just above in +// loadCoordinator. It is possible for two requests to be loading the same +// coordinator (in fact, that's the point of this function -- collapse these +// requests). We do not want the first request canceling it's context to cause +// errors for the second request. +// +// It is ok to leave FindCoordinator running even if the caller quits. Worst +// case, we just cache things for some time in the future; yay. +func (cl *Client) doLoadCoordinators(ctx context.Context, typ int8, keys ...string) map[string]brokerOrErr { m := make(map[string]brokerOrErr, len(keys)) if len(keys) == 0 { return m @@ -1575,7 +1600,12 @@ func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string } } - shards := cl.RequestSharded(ctx, req) + cl.cfg.logger.Log(LogLevelDebug, "prepared to issue find coordinator request", + "coordinator_type", typ, + "coordinator_keys", req.CoordinatorKeys, + ) + + shards := cl.RequestSharded(cl.ctx, req) for _, shard := range shards { if shard.Err != nil { @@ -1674,10 +1704,17 @@ func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error) func (cl *Client) deleteStaleCoordinator(name string, typ int8) { cl.coordinatorsMu.Lock() defer cl.coordinatorsMu.Unlock() - delete(cl.coordinators, coordinatorKey{ - name: name, - typ: typ, - }) + k := coordinatorKey{name, typ} + v := cl.coordinators[k] + if v == nil { + return + } + select { + case <-v.loadWait: + delete(cl.coordinators, k) + default: + // We are actively reloading this coordinator. + } } type brokerOrErr struct {