Skip to content

Commit 0ecb52b

Browse files
committedOct 21, 2023
kgo: do not rotate the consumer session when pausing topics/partitions
Issue #489 asked to stop returning data after a partition was paused -- the original implementation of pausing kept returning any data that was in flight or already buffered, and simply stopped fetching new data. 489 was dealt with by bumping the consumer session, which kills all in flight fetch requests. This was easy, but can cause a lot of connection churn if pausing and resuming a lot -- which is #585. The new implementation allows fetches to complete, but strips data from fetches based on what is paused at the moment the fetches are being returned to the client. This does make polling paused fetches very slightly slower (a map lookup per partition), but there's only so much that's possible. If a partition is paused, we drop the data and do not advance the internal offset. If a partition is not paused, we keep the data and return it -- same as before.
1 parent 6a961da commit 0ecb52b

File tree

4 files changed

+105
-38
lines changed

4 files changed

+105
-38
lines changed
 

‎pkg/kgo/consumer.go

+7-31
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,8 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
433433
}()
434434
}
435435

436+
paused := c.loadPaused()
437+
436438
// A group can grab the consumer lock then the group mu and
437439
// assign partitions. The group mu is grabbed to update its
438440
// uncommitted map. Assigning partitions clears sources ready
@@ -451,13 +453,13 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
451453
c.sourcesReadyMu.Lock()
452454
if maxPollRecords < 0 {
453455
for _, ready := range c.sourcesReadyForDraining {
454-
fetches = append(fetches, ready.takeBuffered())
456+
fetches = append(fetches, ready.takeBuffered(paused))
455457
}
456458
c.sourcesReadyForDraining = nil
457459
} else {
458460
for len(c.sourcesReadyForDraining) > 0 && maxPollRecords > 0 {
459461
source := c.sourcesReadyForDraining[0]
460-
fetch, taken, drained := source.takeNBuffered(maxPollRecords)
462+
fetch, taken, drained := source.takeNBuffered(paused, maxPollRecords)
461463
if drained {
462464
c.sourcesReadyForDraining = c.sourcesReadyForDraining[1:]
463465
}
@@ -555,9 +557,7 @@ func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32) {
555557
// PauseFetchTopics sets the client to no longer fetch the given topics and
556558
// returns all currently paused topics. Paused topics persist until resumed.
557559
// You can call this function with no topics to simply receive the list of
558-
// currently paused topics. Pausing topics drops everything currently buffered
559-
// and kills any in flight fetch requests to ensure nothing that is paused
560-
// can be returned anymore from polling.
560+
// currently paused topics.
561561
//
562562
// Pausing topics is independent from pausing individual partitions with the
563563
// PauseFetchPartitions method. If you pause partitions for a topic with
@@ -569,15 +569,8 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
569569
if len(topics) == 0 {
570570
return c.loadPaused().pausedTopics()
571571
}
572-
573572
c.pausedMu.Lock()
574573
defer c.pausedMu.Unlock()
575-
defer func() {
576-
c.mu.Lock()
577-
defer c.mu.Unlock()
578-
c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics))
579-
}()
580-
581574
paused := c.clonePaused()
582575
paused.addTopics(topics...)
583576
c.storePaused(paused)
@@ -587,9 +580,7 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
587580
// PauseFetchPartitions sets the client to no longer fetch the given partitions
588581
// and returns all currently paused partitions. Paused partitions persist until
589582
// resumed. You can call this function with no partitions to simply receive the
590-
// list of currently paused partitions. Pausing partitions drops everything
591-
// currently buffered and kills any in flight fetch requests to ensure nothing
592-
// that is paused can be returned anymore from polling.
583+
// list of currently paused partitions.
593584
//
594585
// Pausing individual partitions is independent from pausing topics with the
595586
// PauseFetchTopics method. If you pause partitions for a topic with
@@ -601,15 +592,8 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s
601592
if len(topicPartitions) == 0 {
602593
return c.loadPaused().pausedPartitions()
603594
}
604-
605595
c.pausedMu.Lock()
606596
defer c.pausedMu.Unlock()
607-
defer func() {
608-
c.mu.Lock()
609-
defer c.mu.Unlock()
610-
c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions))
611-
}()
612-
613597
paused := c.clonePaused()
614598
paused.addPartitions(topicPartitions)
615599
c.storePaused(paused)
@@ -884,10 +868,6 @@ const (
884868
// The counterpart to assignInvalidateMatching, assignSetMatching
885869
// resets all matching partitions to the specified offset / epoch.
886870
assignSetMatching
887-
888-
// For pausing, we want to drop anything inflight. We start a new
889-
// session with the old tps.
890-
assignBumpSession
891871
)
892872

893873
func (h assignHow) String() string {
@@ -902,8 +882,6 @@ func (h assignHow) String() string {
902882
return "unassigning and purging any partition matching the input topics"
903883
case assignSetMatching:
904884
return "reassigning any currently assigned matching partition to the input"
905-
case assignBumpSession:
906-
return "bumping internal consumer session to drop anything currently in flight"
907885
}
908886
return ""
909887
}
@@ -984,8 +962,6 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
984962
// if we had no session before, which is why we need to pass in
985963
// our topicPartitions.
986964
session = c.guardSessionChange(tps)
987-
} else if how == assignBumpSession {
988-
loadOffsets, tps = c.stopSession()
989965
} else {
990966
loadOffsets, _ = c.stopSession()
991967

@@ -1032,7 +1008,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
10321008
// assignment went straight to listing / epoch loading, and
10331009
// that list/epoch never finished.
10341010
switch how {
1035-
case assignWithoutInvalidating, assignBumpSession:
1011+
case assignWithoutInvalidating:
10361012
// Nothing to do -- this is handled above.
10371013
case assignInvalidateAll:
10381014
loadOffsets = listOrEpochLoads{}

‎pkg/kgo/consumer_direct_test.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,13 @@ func TestPauseIssue489(t *testing.T) {
307307
}
308308
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
309309
sawZero, sawOne = false, false
310-
for i := 0; i < 5; i++ {
311-
fs := cl.PollFetches(ctx)
310+
for i := 0; i < 10; i++ {
311+
var fs Fetches
312+
if i < 5 {
313+
fs = cl.PollFetches(ctx)
314+
} else {
315+
fs = cl.PollRecords(ctx, 2)
316+
}
312317
fs.EachRecord(func(r *Record) {
313318
sawZero = sawZero || r.Partition == 0
314319
sawOne = sawOne || r.Partition == 1

‎pkg/kgo/source.go

+83-5
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,64 @@ func (s *source) hook(f *Fetch, buffered, polled bool) {
344344
}
345345

346346
// takeBuffered drains a buffered fetch and updates offsets.
347-
func (s *source) takeBuffered() Fetch {
348-
return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet)
347+
func (s *source) takeBuffered(paused pausedTopics) Fetch {
348+
if len(paused) == 0 {
349+
return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet)
350+
}
351+
var strip map[string]map[int32]struct{}
352+
f := s.takeBufferedFn(true, func(os usedOffsets) {
353+
for t, ps := range os {
354+
// If the entire topic is paused, we allowUsable all
355+
// and strip the topic entirely.
356+
pps, ok := paused.t(t)
357+
if !ok {
358+
continue
359+
}
360+
if strip == nil {
361+
strip = make(map[string]map[int32]struct{})
362+
}
363+
if pps.all {
364+
for _, o := range ps {
365+
o.from.allowUsable()
366+
}
367+
strip[t] = nil // initialize key, for existence-but-len-0 check below
368+
continue
369+
}
370+
stript := make(map[int32]struct{})
371+
strip[t] = stript
372+
for _, o := range ps {
373+
if _, ok := pps.m[o.from.partition]; ok {
374+
o.from.allowUsable()
375+
stript[o.from.partition] = struct{}{}
376+
continue
377+
}
378+
o.from.setOffset(o.cursorOffset)
379+
o.from.allowUsable()
380+
}
381+
}
382+
})
383+
if strip != nil {
384+
keep := f.Topics[:0]
385+
for _, t := range f.Topics {
386+
stript, ok := strip[t.Topic]
387+
if ok {
388+
if len(stript) == 0 {
389+
continue // stripping this entire topic
390+
}
391+
keepp := t.Partitions[:0]
392+
for _, p := range t.Partitions {
393+
if _, ok := stript[p.Partition]; ok {
394+
continue
395+
}
396+
keepp = append(keepp, p)
397+
}
398+
t.Partitions = keepp
399+
}
400+
keep = append(keep, t)
401+
}
402+
f.Topics = keep
403+
}
404+
return f
349405
}
350406

351407
func (s *source) discardBuffered() {
@@ -359,7 +415,7 @@ func (s *source) discardBuffered() {
359415
//
360416
// This returns the number of records taken and whether the source has been
361417
// completely drained.
362-
func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
418+
func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
363419
var r Fetch
364420
var taken int
365421

@@ -368,6 +424,17 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
368424
for len(bf.Topics) > 0 && n > 0 {
369425
t := &bf.Topics[0]
370426

427+
// If the topic is outright paused, we allowUsable all
428+
// partitions in the topic and skip the topic entirely.
429+
if paused.has(t.Topic, -1) {
430+
bf.Topics = bf.Topics[1:]
431+
for _, pCursor := range b.usedOffsets[t.Topic] {
432+
pCursor.from.allowUsable()
433+
}
434+
delete(b.usedOffsets, t.Topic)
435+
continue
436+
}
437+
371438
r.Topics = append(r.Topics, *t)
372439
rt := &r.Topics[len(r.Topics)-1]
373440
rt.Partitions = nil
@@ -377,6 +444,17 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
377444
for len(t.Partitions) > 0 && n > 0 {
378445
p := &t.Partitions[0]
379446

447+
if paused.has(t.Topic, p.Partition) {
448+
t.Partitions = t.Partitions[1:]
449+
pCursor := tCursors[p.Partition]
450+
pCursor.from.allowUsable()
451+
delete(tCursors, p.Partition)
452+
if len(tCursors) == 0 {
453+
delete(b.usedOffsets, t.Topic)
454+
}
455+
continue
456+
}
457+
380458
rt.Partitions = append(rt.Partitions, *p)
381459
rp := &rt.Partitions[len(rt.Partitions)-1]
382460

@@ -402,7 +480,7 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
402480
if len(tCursors) == 0 {
403481
delete(b.usedOffsets, t.Topic)
404482
}
405-
break
483+
continue
406484
}
407485

408486
lastReturnedRecord := rp.Records[len(rp.Records)-1]
@@ -422,7 +500,7 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
422500

423501
drained := len(bf.Topics) == 0
424502
if drained {
425-
s.takeBuffered()
503+
s.takeBuffered(nil)
426504
}
427505
return r, taken, drained
428506
}

‎pkg/kgo/topics_and_partitions.go

+8
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,14 @@ type pausedPartitions struct {
133133
m map[int32]struct{}
134134
}
135135

136+
func (m pausedTopics) t(topic string) (pausedPartitions, bool) {
137+
if len(m) == 0 { // potentially nil
138+
return pausedPartitions{}, false
139+
}
140+
pps, exists := m[topic]
141+
return pps, exists
142+
}
143+
136144
func (m pausedTopics) has(topic string, partition int32) (paused bool) {
137145
if len(m) == 0 {
138146
return false

0 commit comments

Comments
 (0)
Please sign in to comment.