Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: do not cancel FindCoordinator if the parent context cancels #650

Merged
merged 1 commit into from Dec 21, 2023

Conversation

twmb
Copy link
Owner

@twmb twmb commented Dec 21, 2023

Some load testing in Redpanda showed a failure where consuming quit unexpectedly and unrecoverably.

The sequence of events is:

  • if OffsetCommit is issued just before Heartbeat
  • and the group needs to be loaded so FindCoordinator is triggered,
  • and OffsetCommit happens again, canceling the prior commit's context

Then,

  • FindCoordinator would cancel
  • Heartbeat, which is waiting on the same load, would fail with context.Canceled
  • This error is seen as a group leave error
  • The group management logic would quit entirely.

Now, the context used for FindCoordinator is the client context, which is only closed on client close. This is also better anyway -- if two requests are waiting for the same coordinator load, we don't want the first request canceling to error the second request. If all requests cancel and we have a stray FindCoordinator in flight, that's ok too, because well, worst case we'll just eventually have a little bit of extra data cached that is likely needed in the future anyway.

Closes redpanda-data/redpanda#15131

Some load testing in Redpanda showed a failure where consuming quit
unexpectedly and unrecoverably.

The sequence of events is:
* if OffsetCommit is issued just before Heartbeat
* and the group needs to be loaded so FindCoordinator is triggered,
* and OffsetCommit happens again, canceling the prior commit's context
Then,
* FindCoordinator would cancel
* Heartbeat, which is waiting on the same load, would fail with
  context.Canceled
* This error is seen as a group leave error
* The group management logic would quit entirely.

Now, the context used for FindCoordinator is the client context, which
is only closed on client close. This is also better anyway -- if two
requests are waiting for the same coordinator load, we don't want the
first request canceling to error the second request. If all requests
cancel and we have a stray FindCoordinator in flight, that's ok too,
because well, worst case we'll just eventually have a little bit of
extra data cached that is likely needed in the future anyway.

Closes redpanda-data/redpanda#15131
@twmb twmb merged commit d269dad into master Dec 21, 2023
6 checks passed
@twmb twmb deleted the find_coordinator_ctx branch December 21, 2023 02:21
@twmb
Copy link
Owner Author

twmb commented Dec 21, 2023

The test to trigger this code before the patch, and ensure it is not working after the patch:

package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"sync"
	"time"

	"github.com/twmb/franz-go/pkg/kfake"
	"github.com/twmb/franz-go/pkg/kgo"
	"github.com/twmb/franz-go/pkg/kmsg"
)

func chk(err error) {
	if err != nil {
		panic(err)
	}
}

func main() {
	f, err := kfake.NewCluster(
		kfake.Ports(9092, 9093, 9094),
		kfake.SeedTopics(-1, "foo"),
		kfake.SleepOutOfOrder(),
	)
	chk(err)
	defer f.Close()

	f.ControlKey(int16(kmsg.OffsetCommit), func(req kmsg.Request) (kmsg.Response, error, bool) {
		// We see a commit. We want this to stack a heartbeat request behind us.
		f.DropControl()
		fmt.Println("|||||| In offset commit")
		r := req.(*kmsg.OffsetCommitRequest)
		sawHB := make(chan struct{})
		f.ControlKey(int16(kmsg.Heartbeat), func(kmsg.Request) (kmsg.Response, error, bool) {
			// Now, we want to return NOT_COORDINATOR for this request,
			// triggering a FindCoordinator.
			fmt.Println("|||||| Stacked heartbeat, rehashing coordinators")
			n := f.CoordinatorFor(r.Group)
			for n == f.CoordinatorFor(r.Group) {
				fmt.Println("|||||| Rehashing coordinators...")
				f.RehashCoordinators()
			}
			f.DropControl()
			fmt.Println("|||||| Heartbeat sleeping 1s...")
			f.SleepControl(func() { time.Sleep(time.Second) })
			fmt.Println("|||||| Heartbeat allowing commit to continue...")
			close(sawHB)
			f.SleepControl(func() { time.Sleep(time.Second) })
			fmt.Println("|||||| Heartbeat now continuing...")
			return nil, nil, false
		})
		f.SleepControl(func() { <-sawHB })
		fmt.Println("|||||| Offset commit continuing")

		// Once we get FindCoordinator, we want to hang long enough
		// that the client will cancel the offset commit, which also
		// cancels the heartbeat. We need to ensure the FindCoordinator
		// request goes to a different broker than what we need
		// OffsetCommit to go to.
		var once sync.Once
		findHang := make(chan struct{})
		f.ControlKey(int16(kmsg.FindCoordinator), func(req kmsg.Request) (kmsg.Response, error, bool) {
			if f.CurrentNode() == f.CoordinatorFor(r.Group) {
				fmt.Println("|||||| Failing FindCoordinator request to trigger a retry on a different broker")
				f.KeepControl()
				return nil, errors.New("request a different broker"), true
			}
			fmt.Println("|||||| Hanging in find coordinator")
			f.DropControl()
			defer once.Do(func() { close(findHang) })
			f.SleepControl(func() {
				time.Sleep(15 * time.Second)
				fmt.Println("|||||| Waking up find coordinator")
			})
			fmt.Println("|||||| Returning from find coordinator")
			return nil, nil, false
		})
		f.ControlKey(int16(kmsg.OffsetCommit), func(r2 kmsg.Request) (kmsg.Response, error, bool) {
			if r2.(*kmsg.OffsetCommitRequest) == req.(*kmsg.OffsetCommitRequest) {
				return nil, nil, false
			}
			fmt.Println("|||||| Hanging in OffsetCommit until FindCoordinator returns")
			f.DropControl()
			f.SleepControl(func() { <-findHang })
			fmt.Println("|||||| Returning from OffsetCommit and allowing it")
			return nil, nil, false
		})
		return nil, nil, false
	})

	cl, _ := kgo.NewClient(
		kgo.DefaultProduceTopic("foo"),
		kgo.ConsumeTopics("foo"),
		kgo.ConsumerGroup("g"),
		kgo.AutoCommitInterval(5*time.Second),
		kgo.HeartbeatInterval(1*time.Second),
		kgo.RequestTimeoutOverhead(20*time.Second),
		kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelDebug, nil)),
	)

	go func() {
		for range time.Tick(time.Second) {
			_, err := cl.ProduceSync(context.Background(), kgo.StringRecord(fmt.Sprintf(time.Now().Format(time.TimeOnly)))).First()
			chk(err)
		}
	}()

	for {
		fs := cl.PollFetches(context.Background())
		fs.EachRecord(func(r *kgo.Record) {
			fmt.Printf("got record: %s\n", r.Value)
		})
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fix MPT flakiness
1 participant