Skip to content

Commit

Permalink
kgo: be sure to use topics when other topics are paused
Browse files Browse the repository at this point in the history
Follow up from #585, there was a bug in the commit for it. If any topic
was paused, then all non-paused topics would be returned once, but they
would not be marked as fetchable after that.

I _think_ the non-fetchability would eventually be cleared on a metadata
update, _but_ the source would re-fetch from the old position again. The
only way the topic would advance would be if no topics were paused after
the metadata update.

However this is a bit confusing, and overall this patch is required.

This also patches a second bug in PollFetches with pausing: if a topic
has a paused partition, if the fetch response does NOT contain any
paused partitions, then the logic would actually strip the entire topic.

The pause tests have been strengthened a good bit -- all lines but one
are hit, and the one line that is not hit could more easily be hit if
more partitions are added to the topic / a cluster of size one is used.
The line is currently not hit because it requires one paused partition
and one unpaused partition to be returned from the same broker at the
same time.

Lastly, this adds an error reason to why list or epoch is reloading,
which was used briefly while investigating test slowness.
  • Loading branch information
twmb committed Oct 31, 2023
1 parent 6ebcb43 commit af5bc1f
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 34 deletions.
14 changes: 10 additions & 4 deletions pkg/kgo/consumer.go
Expand Up @@ -1763,10 +1763,16 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload
// guard this entire function.

debug := s.c.cl.cfg.logger.Level() >= LogLevelDebug
var using, reloading map[string]map[int32]EpochOffset

var using map[string]map[int32]EpochOffset
type epochOffsetWhy struct {
EpochOffset
error
}
var reloading map[string]map[int32]epochOffsetWhy
if debug {
using = make(map[string]map[int32]EpochOffset)
reloading = make(map[string]map[int32]EpochOffset)
reloading = make(map[string]map[int32]epochOffsetWhy)
defer func() {
t := "list"
if loaded.loadType == loadTypeEpoch {
Expand Down Expand Up @@ -1818,10 +1824,10 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload
if debug {
treloading := reloading[load.topic]
if treloading == nil {
treloading = make(map[int32]EpochOffset)
treloading = make(map[int32]epochOffsetWhy)
reloading[load.topic] = treloading
}
treloading[load.partition] = EpochOffset{load.leaderEpoch, load.offset}
treloading[load.partition] = epochOffsetWhy{EpochOffset{load.leaderEpoch, load.offset}, load.err}
}
}
}
Expand Down
156 changes: 130 additions & 26 deletions pkg/kgo/consumer_direct_test.go
Expand Up @@ -263,10 +263,19 @@ func TestAddRemovePartitions(t *testing.T) {
}
}

func closed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}

func TestPauseIssue489(t *testing.T) {
t.Parallel()

t1, cleanup := tmpTopicPartitions(t, 2)
t1, cleanup := tmpTopicPartitions(t, 3)
defer cleanup()

cl, _ := NewClient(
Expand All @@ -282,47 +291,142 @@ func TestPauseIssue489(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
var exit atomic.Bool
var zeroOne uint8
var which uint8
for !exit.Load() {
r := StringRecord("v")
r.Partition = int32(zeroOne % 2)
zeroOne++
r.Partition = int32(which % 3)
which++
cl.Produce(ctx, r, func(r *Record, err error) {
if err == context.Canceled {
exit.Store(true)
}
})
time.Sleep(100 * time.Microsecond)
}
}()
defer cancel()

for i := 0; i < 10; i++ {
var sawZero, sawOne bool
for !sawZero || !sawOne {
fs := cl.PollFetches(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
})
}
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
sawZero, sawOne = false, false
for _, pollfn := range []struct {
name string
fn func(context.Context) Fetches
}{
{"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }},
{"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }},
} {
for i := 0; i < 10; i++ {
var fs Fetches
if i < 5 {
fs = cl.PollFetches(ctx)
} else {
fs = cl.PollRecords(ctx, 2)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
var sawZero, sawOne, sawTwo bool
for (!sawZero || !sawOne || !sawTwo) && !closed(ctx.Done()) {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
sawTwo = sawTwo || r.Partition == 2
})
}
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
sawZero, sawOne, sawTwo = false, false, false
for i := 0; i < 10 && !closed(ctx.Done()); i++ {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
sawTwo = sawTwo || r.Partition == 2
})
}
cancel()
if sawZero {
t.Fatalf("%s: saw partition zero even though it was paused", pollfn.name)
}
if !sawOne {
t.Fatalf("%s: did not see partition one even though it was not paused", pollfn.name)
}
if !sawTwo {
t.Fatalf("%s: did not see partition two even though it was not paused", pollfn.name)
}
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
}
}
}

func TestPauseIssueOct2023(t *testing.T) {
t.Parallel()

t1, cleanup1 := tmpTopicPartitions(t, 1)
t2, cleanup2 := tmpTopicPartitions(t, 1)
t3, cleanup3 := tmpTopicPartitions(t, 1)
defer cleanup1()
defer cleanup2()
defer cleanup3()
ts := []string{t1, t2, t3}

cl, _ := NewClient(
getSeedBrokers(),
UnknownTopicRetries(-1),
ConsumeTopics(ts...),
MetadataMinAge(50*time.Millisecond),
FetchMaxWait(100*time.Millisecond),
)
defer cl.Close()

ctx, cancel := context.WithCancel(context.Background())
go func() {
var exit atomic.Bool
var which int
for !exit.Load() {
r := StringRecord("v")
r.Topic = ts[which%len(ts)]
which++
cl.Produce(ctx, r, func(r *Record, err error) {
if err == context.Canceled {
exit.Store(true)
}
})
time.Sleep(100 * time.Microsecond)
}
if sawZero {
t.Error("saw partition zero even though it was paused")
}()
defer cancel()

for _, pollfn := range []struct {
name string
fn func(context.Context) Fetches
}{
{"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }},
{"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }},
} {
for i := 0; i < 10; i++ {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
var sawt1, sawt2, sawt3 bool
for (!sawt1 || !sawt2 || !sawt3) && !closed(ctx.Done()) {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawt1 = sawt1 || r.Topic == t1
sawt2 = sawt2 || r.Topic == t2
sawt3 = sawt3 || r.Topic == t3
})
}
cl.PauseFetchTopics(t1)
sawt1, sawt2, sawt3 = false, false, false
for i := 0; i < 10 && !closed(ctx.Done()); i++ {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawt1 = sawt1 || r.Topic == t1
sawt2 = sawt2 || r.Topic == t2
sawt3 = sawt3 || r.Topic == t3
})
}
cancel()
if sawt1 {
t.Fatalf("%s: saw topic t1 even though it was paused", pollfn.name)
}
if !sawt2 {
t.Fatalf("%s: did not see topic t2 even though it was not paused", pollfn.name)
}
if !sawt3 {
t.Fatalf("%s: did not see topic t3 even though it was not paused", pollfn.name)
}
cl.ResumeFetchTopics(t1)
}
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
}
}

Expand Down
27 changes: 23 additions & 4 deletions pkg/kgo/source.go
Expand Up @@ -355,6 +355,10 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
// and strip the topic entirely.
pps, ok := paused.t(t)
if !ok {
for _, o := range ps {
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
}
continue
}
if strip == nil {
Expand All @@ -368,7 +372,6 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
continue
}
stript := make(map[int32]struct{})
strip[t] = stript
for _, o := range ps {
if _, ok := pps.m[o.from.partition]; ok {
o.from.allowUsable()
Expand All @@ -378,6 +381,15 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
}
// We only add stript to strip if there are any
// stripped partitions. We could have a paused
// partition that is on another broker, while this
// broker has no paused partitions -- if we add stript
// here, our logic below (stripping this entire topic)
// is more confusing (present nil vs. non-present nil).
if len(stript) > 0 {
strip[t] = stript
}
}
})
if strip != nil {
Expand Down Expand Up @@ -435,9 +447,15 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
continue
}

r.Topics = append(r.Topics, *t)
rt := &r.Topics[len(r.Topics)-1]
rt.Partitions = nil
var rt *FetchTopic
ensureTopicAdded := func() {
if rt != nil {
return
}
r.Topics = append(r.Topics, *t)
rt = &r.Topics[len(r.Topics)-1]
rt.Partitions = nil
}

tCursors := b.usedOffsets[t.Topic]

Expand All @@ -455,6 +473,7 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
continue
}

ensureTopicAdded()
rt.Partitions = append(rt.Partitions, *p)
rp := &rt.Partitions[len(rt.Partitions)-1]

Expand Down

0 comments on commit af5bc1f

Please sign in to comment.