Skip to content

Commit af5bc1f

Browse files
committedOct 31, 2023
kgo: be sure to use topics when other topics are paused
Follow up from #585, there was a bug in the commit for it. If any topic was paused, then all non-paused topics would be returned once, but they would not be marked as fetchable after that. I _think_ the non-fetchability would eventually be cleared on a metadata update, _but_ the source would re-fetch from the old position again. The only way the topic would advance would be if no topics were paused after the metadata update. However this is a bit confusing, and overall this patch is required. This also patches a second bug in PollFetches with pausing: if a topic has a paused partition, if the fetch response does NOT contain any paused partitions, then the logic would actually strip the entire topic. The pause tests have been strengthened a good bit -- all lines but one are hit, and the one line that is not hit could more easily be hit if more partitions are added to the topic / a cluster of size one is used. The line is currently not hit because it requires one paused partition and one unpaused partition to be returned from the same broker at the same time. Lastly, this adds an error reason to why list or epoch is reloading, which was used briefly while investigating test slowness.
1 parent 6ebcb43 commit af5bc1f

File tree

3 files changed

+163
-34
lines changed

3 files changed

+163
-34
lines changed
 

‎pkg/kgo/consumer.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -1763,10 +1763,16 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload
17631763
// guard this entire function.
17641764

17651765
debug := s.c.cl.cfg.logger.Level() >= LogLevelDebug
1766-
var using, reloading map[string]map[int32]EpochOffset
1766+
1767+
var using map[string]map[int32]EpochOffset
1768+
type epochOffsetWhy struct {
1769+
EpochOffset
1770+
error
1771+
}
1772+
var reloading map[string]map[int32]epochOffsetWhy
17671773
if debug {
17681774
using = make(map[string]map[int32]EpochOffset)
1769-
reloading = make(map[string]map[int32]EpochOffset)
1775+
reloading = make(map[string]map[int32]epochOffsetWhy)
17701776
defer func() {
17711777
t := "list"
17721778
if loaded.loadType == loadTypeEpoch {
@@ -1818,10 +1824,10 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload
18181824
if debug {
18191825
treloading := reloading[load.topic]
18201826
if treloading == nil {
1821-
treloading = make(map[int32]EpochOffset)
1827+
treloading = make(map[int32]epochOffsetWhy)
18221828
reloading[load.topic] = treloading
18231829
}
1824-
treloading[load.partition] = EpochOffset{load.leaderEpoch, load.offset}
1830+
treloading[load.partition] = epochOffsetWhy{EpochOffset{load.leaderEpoch, load.offset}, load.err}
18251831
}
18261832
}
18271833
}

‎pkg/kgo/consumer_direct_test.go

+130-26
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,19 @@ func TestAddRemovePartitions(t *testing.T) {
263263
}
264264
}
265265

266+
func closed(ch <-chan struct{}) bool {
267+
select {
268+
case <-ch:
269+
return true
270+
default:
271+
return false
272+
}
273+
}
274+
266275
func TestPauseIssue489(t *testing.T) {
267276
t.Parallel()
268277

269-
t1, cleanup := tmpTopicPartitions(t, 2)
278+
t1, cleanup := tmpTopicPartitions(t, 3)
270279
defer cleanup()
271280

272281
cl, _ := NewClient(
@@ -282,47 +291,142 @@ func TestPauseIssue489(t *testing.T) {
282291
ctx, cancel := context.WithCancel(context.Background())
283292
go func() {
284293
var exit atomic.Bool
285-
var zeroOne uint8
294+
var which uint8
286295
for !exit.Load() {
287296
r := StringRecord("v")
288-
r.Partition = int32(zeroOne % 2)
289-
zeroOne++
297+
r.Partition = int32(which % 3)
298+
which++
290299
cl.Produce(ctx, r, func(r *Record, err error) {
291300
if err == context.Canceled {
292301
exit.Store(true)
293302
}
294303
})
304+
time.Sleep(100 * time.Microsecond)
295305
}
296306
}()
297307
defer cancel()
298308

299-
for i := 0; i < 10; i++ {
300-
var sawZero, sawOne bool
301-
for !sawZero || !sawOne {
302-
fs := cl.PollFetches(ctx)
303-
fs.EachRecord(func(r *Record) {
304-
sawZero = sawZero || r.Partition == 0
305-
sawOne = sawOne || r.Partition == 1
306-
})
307-
}
308-
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
309-
sawZero, sawOne = false, false
309+
for _, pollfn := range []struct {
310+
name string
311+
fn func(context.Context) Fetches
312+
}{
313+
{"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }},
314+
{"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }},
315+
} {
310316
for i := 0; i < 10; i++ {
311-
var fs Fetches
312-
if i < 5 {
313-
fs = cl.PollFetches(ctx)
314-
} else {
315-
fs = cl.PollRecords(ctx, 2)
317+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
318+
var sawZero, sawOne, sawTwo bool
319+
for (!sawZero || !sawOne || !sawTwo) && !closed(ctx.Done()) {
320+
fs := pollfn.fn(ctx)
321+
fs.EachRecord(func(r *Record) {
322+
sawZero = sawZero || r.Partition == 0
323+
sawOne = sawOne || r.Partition == 1
324+
sawTwo = sawTwo || r.Partition == 2
325+
})
316326
}
317-
fs.EachRecord(func(r *Record) {
318-
sawZero = sawZero || r.Partition == 0
319-
sawOne = sawOne || r.Partition == 1
327+
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
328+
sawZero, sawOne, sawTwo = false, false, false
329+
for i := 0; i < 10 && !closed(ctx.Done()); i++ {
330+
fs := pollfn.fn(ctx)
331+
fs.EachRecord(func(r *Record) {
332+
sawZero = sawZero || r.Partition == 0
333+
sawOne = sawOne || r.Partition == 1
334+
sawTwo = sawTwo || r.Partition == 2
335+
})
336+
}
337+
cancel()
338+
if sawZero {
339+
t.Fatalf("%s: saw partition zero even though it was paused", pollfn.name)
340+
}
341+
if !sawOne {
342+
t.Fatalf("%s: did not see partition one even though it was not paused", pollfn.name)
343+
}
344+
if !sawTwo {
345+
t.Fatalf("%s: did not see partition two even though it was not paused", pollfn.name)
346+
}
347+
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
348+
}
349+
}
350+
}
351+
352+
func TestPauseIssueOct2023(t *testing.T) {
353+
t.Parallel()
354+
355+
t1, cleanup1 := tmpTopicPartitions(t, 1)
356+
t2, cleanup2 := tmpTopicPartitions(t, 1)
357+
t3, cleanup3 := tmpTopicPartitions(t, 1)
358+
defer cleanup1()
359+
defer cleanup2()
360+
defer cleanup3()
361+
ts := []string{t1, t2, t3}
362+
363+
cl, _ := NewClient(
364+
getSeedBrokers(),
365+
UnknownTopicRetries(-1),
366+
ConsumeTopics(ts...),
367+
MetadataMinAge(50*time.Millisecond),
368+
FetchMaxWait(100*time.Millisecond),
369+
)
370+
defer cl.Close()
371+
372+
ctx, cancel := context.WithCancel(context.Background())
373+
go func() {
374+
var exit atomic.Bool
375+
var which int
376+
for !exit.Load() {
377+
r := StringRecord("v")
378+
r.Topic = ts[which%len(ts)]
379+
which++
380+
cl.Produce(ctx, r, func(r *Record, err error) {
381+
if err == context.Canceled {
382+
exit.Store(true)
383+
}
320384
})
385+
time.Sleep(100 * time.Microsecond)
321386
}
322-
if sawZero {
323-
t.Error("saw partition zero even though it was paused")
387+
}()
388+
defer cancel()
389+
390+
for _, pollfn := range []struct {
391+
name string
392+
fn func(context.Context) Fetches
393+
}{
394+
{"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }},
395+
{"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }},
396+
} {
397+
for i := 0; i < 10; i++ {
398+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
399+
var sawt1, sawt2, sawt3 bool
400+
for (!sawt1 || !sawt2 || !sawt3) && !closed(ctx.Done()) {
401+
fs := pollfn.fn(ctx)
402+
fs.EachRecord(func(r *Record) {
403+
sawt1 = sawt1 || r.Topic == t1
404+
sawt2 = sawt2 || r.Topic == t2
405+
sawt3 = sawt3 || r.Topic == t3
406+
})
407+
}
408+
cl.PauseFetchTopics(t1)
409+
sawt1, sawt2, sawt3 = false, false, false
410+
for i := 0; i < 10 && !closed(ctx.Done()); i++ {
411+
fs := pollfn.fn(ctx)
412+
fs.EachRecord(func(r *Record) {
413+
sawt1 = sawt1 || r.Topic == t1
414+
sawt2 = sawt2 || r.Topic == t2
415+
sawt3 = sawt3 || r.Topic == t3
416+
})
417+
}
418+
cancel()
419+
if sawt1 {
420+
t.Fatalf("%s: saw topic t1 even though it was paused", pollfn.name)
421+
}
422+
if !sawt2 {
423+
t.Fatalf("%s: did not see topic t2 even though it was not paused", pollfn.name)
424+
}
425+
if !sawt3 {
426+
t.Fatalf("%s: did not see topic t3 even though it was not paused", pollfn.name)
427+
}
428+
cl.ResumeFetchTopics(t1)
324429
}
325-
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
326430
}
327431
}
328432

‎pkg/kgo/source.go

+23-4
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,10 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
355355
// and strip the topic entirely.
356356
pps, ok := paused.t(t)
357357
if !ok {
358+
for _, o := range ps {
359+
o.from.setOffset(o.cursorOffset)
360+
o.from.allowUsable()
361+
}
358362
continue
359363
}
360364
if strip == nil {
@@ -368,7 +372,6 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
368372
continue
369373
}
370374
stript := make(map[int32]struct{})
371-
strip[t] = stript
372375
for _, o := range ps {
373376
if _, ok := pps.m[o.from.partition]; ok {
374377
o.from.allowUsable()
@@ -378,6 +381,15 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
378381
o.from.setOffset(o.cursorOffset)
379382
o.from.allowUsable()
380383
}
384+
// We only add stript to strip if there are any
385+
// stripped partitions. We could have a paused
386+
// partition that is on another broker, while this
387+
// broker has no paused partitions -- if we add stript
388+
// here, our logic below (stripping this entire topic)
389+
// is more confusing (present nil vs. non-present nil).
390+
if len(stript) > 0 {
391+
strip[t] = stript
392+
}
381393
}
382394
})
383395
if strip != nil {
@@ -435,9 +447,15 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
435447
continue
436448
}
437449

438-
r.Topics = append(r.Topics, *t)
439-
rt := &r.Topics[len(r.Topics)-1]
440-
rt.Partitions = nil
450+
var rt *FetchTopic
451+
ensureTopicAdded := func() {
452+
if rt != nil {
453+
return
454+
}
455+
r.Topics = append(r.Topics, *t)
456+
rt = &r.Topics[len(r.Topics)-1]
457+
rt.Partitions = nil
458+
}
441459

442460
tCursors := b.usedOffsets[t.Topic]
443461

@@ -455,6 +473,7 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
455473
continue
456474
}
457475

476+
ensureTopicAdded()
458477
rt.Partitions = append(rt.Partitions, *p)
459478
rp := &rt.Partitions[len(rt.Partitions)-1]
460479

0 commit comments

Comments
 (0)
Please sign in to comment.