From 0ecb52bc2e9a0596c10c3995a2538d086572e69f Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 20 Oct 2023 20:55:00 -0600 Subject: [PATCH] kgo: do not rotate the consumer session when pausing topics/partitions Issue #489 asked to stop returning data after a partition was paused -- the original implementation of pausing kept returning any data that was in flight or already buffered, and simply stopped fetching new data. 489 was dealt with by bumping the consumer session, which kills all in flight fetch requests. This was easy, but can cause a lot of connection churn if pausing and resuming a lot -- which is #585. The new implementation allows fetches to complete, but strips data from fetches based on what is paused at the moment the fetches are being returned to the client. This does make polling paused fetches very slightly slower (a map lookup per partition), but there's only so much that's possible. If a partition is paused, we drop the data and do not advance the internal offset. If a partition is not paused, we keep the data and return it -- same as before. --- pkg/kgo/consumer.go | 38 +++----------- pkg/kgo/consumer_direct_test.go | 9 +++- pkg/kgo/source.go | 88 ++++++++++++++++++++++++++++++-- pkg/kgo/topics_and_partitions.go | 8 +++ 4 files changed, 105 insertions(+), 38 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 475270ed..9fdf9bde 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -433,6 +433,8 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { }() } + paused := c.loadPaused() + // A group can grab the consumer lock then the group mu and // assign partitions. The group mu is grabbed to update its // uncommitted map. Assigning partitions clears sources ready @@ -451,13 +453,13 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { c.sourcesReadyMu.Lock() if maxPollRecords < 0 { for _, ready := range c.sourcesReadyForDraining { - fetches = append(fetches, ready.takeBuffered()) + fetches = append(fetches, ready.takeBuffered(paused)) } c.sourcesReadyForDraining = nil } else { for len(c.sourcesReadyForDraining) > 0 && maxPollRecords > 0 { source := c.sourcesReadyForDraining[0] - fetch, taken, drained := source.takeNBuffered(maxPollRecords) + fetch, taken, drained := source.takeNBuffered(paused, maxPollRecords) if drained { c.sourcesReadyForDraining = c.sourcesReadyForDraining[1:] } @@ -555,9 +557,7 @@ func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32) { // PauseFetchTopics sets the client to no longer fetch the given topics and // returns all currently paused topics. Paused topics persist until resumed. // You can call this function with no topics to simply receive the list of -// currently paused topics. Pausing topics drops everything currently buffered -// and kills any in flight fetch requests to ensure nothing that is paused -// can be returned anymore from polling. +// currently paused topics. // // Pausing topics is independent from pausing individual partitions with the // PauseFetchPartitions method. If you pause partitions for a topic with @@ -569,15 +569,8 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string { if len(topics) == 0 { return c.loadPaused().pausedTopics() } - c.pausedMu.Lock() defer c.pausedMu.Unlock() - defer func() { - c.mu.Lock() - defer c.mu.Unlock() - c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics)) - }() - paused := c.clonePaused() paused.addTopics(topics...) c.storePaused(paused) @@ -587,9 +580,7 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string { // PauseFetchPartitions sets the client to no longer fetch the given partitions // and returns all currently paused partitions. Paused partitions persist until // resumed. You can call this function with no partitions to simply receive the -// list of currently paused partitions. Pausing partitions drops everything -// currently buffered and kills any in flight fetch requests to ensure nothing -// that is paused can be returned anymore from polling. +// list of currently paused partitions. // // Pausing individual partitions is independent from pausing topics with the // PauseFetchTopics method. If you pause partitions for a topic with @@ -601,15 +592,8 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s if len(topicPartitions) == 0 { return c.loadPaused().pausedPartitions() } - c.pausedMu.Lock() defer c.pausedMu.Unlock() - defer func() { - c.mu.Lock() - defer c.mu.Unlock() - c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions)) - }() - paused := c.clonePaused() paused.addPartitions(topicPartitions) c.storePaused(paused) @@ -884,10 +868,6 @@ const ( // The counterpart to assignInvalidateMatching, assignSetMatching // resets all matching partitions to the specified offset / epoch. assignSetMatching - - // For pausing, we want to drop anything inflight. We start a new - // session with the old tps. - assignBumpSession ) func (h assignHow) String() string { @@ -902,8 +882,6 @@ func (h assignHow) String() string { return "unassigning and purging any partition matching the input topics" case assignSetMatching: return "reassigning any currently assigned matching partition to the input" - case assignBumpSession: - return "bumping internal consumer session to drop anything currently in flight" } return "" } @@ -984,8 +962,6 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how // if we had no session before, which is why we need to pass in // our topicPartitions. session = c.guardSessionChange(tps) - } else if how == assignBumpSession { - loadOffsets, tps = c.stopSession() } else { loadOffsets, _ = c.stopSession() @@ -1032,7 +1008,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how // assignment went straight to listing / epoch loading, and // that list/epoch never finished. switch how { - case assignWithoutInvalidating, assignBumpSession: + case assignWithoutInvalidating: // Nothing to do -- this is handled above. case assignInvalidateAll: loadOffsets = listOrEpochLoads{} diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index 548ab575..ac385e8e 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -307,8 +307,13 @@ func TestPauseIssue489(t *testing.T) { } cl.PauseFetchPartitions(map[string][]int32{t1: {0}}) sawZero, sawOne = false, false - for i := 0; i < 5; i++ { - fs := cl.PollFetches(ctx) + for i := 0; i < 10; i++ { + var fs Fetches + if i < 5 { + fs = cl.PollFetches(ctx) + } else { + fs = cl.PollRecords(ctx, 2) + } fs.EachRecord(func(r *Record) { sawZero = sawZero || r.Partition == 0 sawOne = sawOne || r.Partition == 1 diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 226ef458..3086e60d 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -344,8 +344,64 @@ func (s *source) hook(f *Fetch, buffered, polled bool) { } // takeBuffered drains a buffered fetch and updates offsets. -func (s *source) takeBuffered() Fetch { - return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet) +func (s *source) takeBuffered(paused pausedTopics) Fetch { + if len(paused) == 0 { + return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet) + } + var strip map[string]map[int32]struct{} + f := s.takeBufferedFn(true, func(os usedOffsets) { + for t, ps := range os { + // If the entire topic is paused, we allowUsable all + // and strip the topic entirely. + pps, ok := paused.t(t) + if !ok { + continue + } + if strip == nil { + strip = make(map[string]map[int32]struct{}) + } + if pps.all { + for _, o := range ps { + o.from.allowUsable() + } + strip[t] = nil // initialize key, for existence-but-len-0 check below + continue + } + stript := make(map[int32]struct{}) + strip[t] = stript + for _, o := range ps { + if _, ok := pps.m[o.from.partition]; ok { + o.from.allowUsable() + stript[o.from.partition] = struct{}{} + continue + } + o.from.setOffset(o.cursorOffset) + o.from.allowUsable() + } + } + }) + if strip != nil { + keep := f.Topics[:0] + for _, t := range f.Topics { + stript, ok := strip[t.Topic] + if ok { + if len(stript) == 0 { + continue // stripping this entire topic + } + keepp := t.Partitions[:0] + for _, p := range t.Partitions { + if _, ok := stript[p.Partition]; ok { + continue + } + keepp = append(keepp, p) + } + t.Partitions = keepp + } + keep = append(keep, t) + } + f.Topics = keep + } + return f } func (s *source) discardBuffered() { @@ -359,7 +415,7 @@ func (s *source) discardBuffered() { // // This returns the number of records taken and whether the source has been // completely drained. -func (s *source) takeNBuffered(n int) (Fetch, int, bool) { +func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) { var r Fetch var taken int @@ -368,6 +424,17 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) { for len(bf.Topics) > 0 && n > 0 { t := &bf.Topics[0] + // If the topic is outright paused, we allowUsable all + // partitions in the topic and skip the topic entirely. + if paused.has(t.Topic, -1) { + bf.Topics = bf.Topics[1:] + for _, pCursor := range b.usedOffsets[t.Topic] { + pCursor.from.allowUsable() + } + delete(b.usedOffsets, t.Topic) + continue + } + r.Topics = append(r.Topics, *t) rt := &r.Topics[len(r.Topics)-1] rt.Partitions = nil @@ -377,6 +444,17 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) { for len(t.Partitions) > 0 && n > 0 { p := &t.Partitions[0] + if paused.has(t.Topic, p.Partition) { + t.Partitions = t.Partitions[1:] + pCursor := tCursors[p.Partition] + pCursor.from.allowUsable() + delete(tCursors, p.Partition) + if len(tCursors) == 0 { + delete(b.usedOffsets, t.Topic) + } + continue + } + rt.Partitions = append(rt.Partitions, *p) rp := &rt.Partitions[len(rt.Partitions)-1] @@ -402,7 +480,7 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) { if len(tCursors) == 0 { delete(b.usedOffsets, t.Topic) } - break + continue } lastReturnedRecord := rp.Records[len(rp.Records)-1] @@ -422,7 +500,7 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) { drained := len(bf.Topics) == 0 if drained { - s.takeBuffered() + s.takeBuffered(nil) } return r, taken, drained } diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index 9e6e9e9e..91b40c10 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -133,6 +133,14 @@ type pausedPartitions struct { m map[int32]struct{} } +func (m pausedTopics) t(topic string) (pausedPartitions, bool) { + if len(m) == 0 { // potentially nil + return pausedPartitions{}, false + } + pps, exists := m[topic] + return pps, exists +} + func (m pausedTopics) has(topic string, partition int32) (paused bool) { if len(m) == 0 { return false