Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠Only allow sources to start once #2686

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

inteon
Copy link
Member

@inteon inteon commented Feb 13, 2024

Currently, only the Informer and Channel source can be started more than once (Start function can be called twice on same object).
I do believe however that this pattern is flawed, each source should only be started once, so that the context passed to that Start function is only used to stop that one instance from running and all the properties in the source belong to that single instance. The Channel source, for example, currently stops all sources that were started from the same instance when the first context passed to Start gets canceled (see #2686 (comment)).

For the Channel source, allowing the source to start only once, requires the API to be altered, since we want to make it possible to listen to the same channel using multiple Channel sources (matching the current behavior). This requires a new Channel Broadcaster to be introduced.

@k8s-ci-robot k8s-ci-robot added the cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. label Feb 13, 2024
@k8s-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: inteon
Once this PR has been reviewed and has the lgtm label, please assign sbueringer for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@k8s-ci-robot k8s-ci-robot added the size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. label Feb 13, 2024
@inteon inteon force-pushed the source_start_once branch 3 times, most recently from 0cebcae to bde0a5e Compare February 13, 2024 22:00
@inteon inteon changed the title ⚠Only allow sources to only start once ⚠Only allow sources to start once Feb 20, 2024
@k8s-ci-robot k8s-ci-robot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Mar 26, 2024

cs.once.Do(func() {
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
go cs.syncLoop(ctx)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main issue that this PR solves: here, the context passed to the first call of Start is used to manage the goroutine that is used for all future calls to Start.

@k8s-ci-robot k8s-ci-robot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Apr 23, 2024
@k8s-ci-robot k8s-ci-robot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Apr 23, 2024
Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com>
Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com>
@k8s-ci-robot
Copy link
Contributor

k8s-ci-robot commented Apr 23, 2024

@inteon: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-controller-runtime-apidiff 7abbd42 link false /test pull-controller-runtime-apidiff

Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here.

@sbueringer
Copy link
Member

sbueringer commented Apr 23, 2024

I'm really having some trouble understanding what the goal for users of controller-runtime is with this PR.

What use case are we trying to solve?

@inteon
Copy link
Member Author

inteon commented Apr 23, 2024

@sbueringer Below is a small example test to illustrate the issue at hand.
I start two controllers which both listen to the same channel.
If the first controller is stopped (context passed to Start is canceled), the second controller also stops receiving events.

On master (second controller does NOT receive any events):

func TestMultiStart(t *testing.T) {
	rootCtx := context.Background()

	watchChan := make(chan event.GenericEvent, 10)
	watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{})

	m, err := manager.New(&rest.Config{}, manager.Options{})
	require.NoError(t, err)

	// Controller 1 is started and stopped directly
	{
		c1, err := controller.New("controller-1", m, controller.Options{Reconciler: reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
			t.Fail() // Reconcilder should not get called

			return reconcile.Result{}, nil
		})})
		require.NoError(t, err)
		require.NoError(t, c1.Watch(watch))
		ctx1, cancel1 := context.WithCancel(rootCtx)
		cancel1()
		require.NoError(t, c1.Start(ctx1))
	}

	// Controller 2 is listening on the same channel
	callCount := int32(0)
	c2, err := controller.New("controller-2", m, controller.Options{Reconciler: reconcile.Func(func(ctx context.Context, o reconcile.Request) (reconcile.Result, error) {
		t.Log("got event!", atomic.AddInt32(&callCount, 1))

		return reconcile.Result{}, nil
	})})
	require.NoError(t, err)
	require.NoError(t, c2.Watch(watch))

	ctx2, stop := context.WithTimeout(rootCtx, time.Second*5)
	defer stop()
	done := make(chan struct{})
	go func() {
		defer close(done)
		require.NoError(t, c2.Start(ctx2))
	}()

	// Start sending messages on the channel
	watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test1"}}}
	watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test2"}}}
	watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test3"}}}

	<-done

	// We waited 5 seconds, which should be enough to have at least received
	// one of the events on the channel
	require.GreaterOrEqual(t, callCount, int32(1))
}
$ go test -count=5 -timeout 30s -run ^TestMultiStart$ sigs.k8s.io/controller-runtime
--- FAIL: TestMultiStart (5.00s)
    aa_test.go:71: 
                Error Trace:    /home/tramlot/projects/controller-runtime/aa_test.go:71
                Error:          "0" is not greater than or equal to "1"
                Test:           TestMultiStart
--- FAIL: TestMultiStart (5.00s)
    aa_test.go:71: 
                Error Trace:    /home/tramlot/projects/controller-runtime/aa_test.go:71
                Error:          "0" is not greater than or equal to "1"
                Test:           TestMultiStart
--- FAIL: TestMultiStart (5.00s)
    aa_test.go:71: 
                Error Trace:    /home/tramlot/projects/controller-runtime/aa_test.go:71
                Error:          "0" is not greater than or equal to "1"
                Test:           TestMultiStart
--- FAIL: TestMultiStart (5.00s)
    aa_test.go:71: 
                Error Trace:    /home/tramlot/projects/controller-runtime/aa_test.go:71
                Error:          "0" is not greater than or equal to "1"
                Test:           TestMultiStart
--- FAIL: TestMultiStart (5.00s)
    aa_test.go:71: 
                Error Trace:    /home/tramlot/projects/controller-runtime/aa_test.go:71
                Error:          "0" is not greater than or equal to "1"
                Test:           TestMultiStart
FAIL
FAIL    sigs.k8s.io/controller-runtime  25.019s
FAIL

With this PR (second controller DOES receive any events):

func TestMultiStart(t *testing.T) {
	rootCtx := context.Background()

	watchChan := make(chan event.GenericEvent, 10)
	broadcaster := source.NewChannelBroadcaster(watchChan)
	watch1 := source.Channel(broadcaster, &handler.EnqueueRequestForObject{})
	watch2 := source.Channel(broadcaster, &handler.EnqueueRequestForObject{})

	m, err := manager.New(&rest.Config{}, manager.Options{})
	require.NoError(t, err)

	// Controller 1 is started and stopped directly
	{
		c1, err := controller.New("controller-1", m, controller.Options{Reconciler: reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
			t.Fail() // Reconcilder should not get called

			return reconcile.Result{}, nil
		})})
		require.NoError(t, err)
		require.NoError(t, c1.Watch(watch1))
		ctx1, cancel1 := context.WithCancel(rootCtx)
		cancel1()
		require.NoError(t, c1.Start(ctx1))
	}

	// Controller 2 is listening on the same channel
	callCount := int32(0)
	c2, err := controller.New("controller-2", m, controller.Options{Reconciler: reconcile.Func(func(ctx context.Context, o reconcile.Request) (reconcile.Result, error) {
		t.Log("got event!", atomic.AddInt32(&callCount, 1))

		return reconcile.Result{}, nil
	})})
	require.NoError(t, err)
	require.NoError(t, c2.Watch(watch2))

	ctx2, stop := context.WithTimeout(rootCtx, time.Second*5)
	defer stop()
	done := make(chan struct{})
	go func() {
		defer close(done)
		require.NoError(t, c2.Start(ctx2))
	}()

	// Start sending messages on the channel
	watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test1"}}}
	watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test2"}}}
	watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test3"}}}

	<-done

	// We waited 5 seconds, which should be enough to have at least received
	// one of the events on the channel
	require.GreaterOrEqual(t, callCount, int32(1))
}
$ go test -count=5 -timeout 30s -run ^TestMultiStart$ sigs.k8s.io/controller-runtime
ok      sigs.k8s.io/controller-runtime  25.017s

@sbueringer
Copy link
Member

sbueringer commented Apr 23, 2024

I get the point that calling Start twice on the same Channel is problematic because of the strange behavior on cancel.

I'm not sure if that necessarily means that we should have the channelBroadcaster implementation within controller-runtime to support having multiple Sources with the same chan.

@inteon
Copy link
Member Author

inteon commented Apr 23, 2024

I get the point that calling Start twice on the same Channel is problematic because of the strange behavior on cancel.

I'm not sure if that necessarily means that we should have the channelBroadcaster implementation within controller-runtime to support having multiple Sources on the same chan.

We can also say that 1 channel source = 1 channel.
However, that means we drop the broadcasting feature that we have today.

@sbueringer
Copy link
Member

sbueringer commented Apr 23, 2024

We can also say that 1 channel source = 1 channel.
However, that means we drop the broadcasting feature that we have today.

I'm not sure if that is/was an intended feature or it just mostly worked (minus the cancel behavior of the context).
(I just don't have the history on that to be honest).

Let's see what @alvaroaleman thinks.

@alvaroaleman
Copy link
Member

I'm not sure if that is/was an intended feature or it just mostly worked (minus the cancel behavior of the context).
(I just don't have the history on that to be honest).

I don't know either if this was intentional, but the cancel behavior means that this never worked correctly. This change means that the UX of source.Channel gets worse for everyone to support something that never worked properly to begin with. If someone wants this (and we don't know if that is the case), they should multiplex a single source channel onto multiple target channels themselves.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants