Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Runnable group should check if stopped before enqueueing #2757

Merged
merged 1 commit into from Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/manager/runnable_group.go
Expand Up @@ -263,6 +263,15 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
r.start.Unlock()
}

// Recheck if we're stopped and hold the readlock, given that the stop and start can be called
// at the same time, we can end up in a situation where the runnable is added
// after the group is stopped and the channel is closed.
r.stop.RLock()
defer r.stop.RUnlock()
if r.stopped {
return errRunnableGroupStopped
}

// Enqueue the runnable.
r.ch <- readyRunnable
return nil
Expand All @@ -272,7 +281,11 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
func (r *runnableGroup) StopAndWait(ctx context.Context) {
r.stopOnce.Do(func() {
// Close the reconciler channel once we're done.
defer close(r.ch)
defer func() {
r.stop.Lock()
close(r.ch)
r.stop.Unlock()
}()

_ = r.Start(ctx)
r.stop.Lock()
Expand Down
36 changes: 36 additions & 0 deletions pkg/manager/runnable_group_test.go
Expand Up @@ -161,6 +161,42 @@ var _ = Describe("runnableGroup", func() {
}
})

It("should be able to handle adding runnables while stopping", func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
rg := newRunnableGroup(defaultBaseContext, errCh)

go func() {
defer GinkgoRecover()
<-time.After(1 * time.Millisecond)
Expect(rg.Start(ctx)).To(Succeed())
}()
go func() {
defer GinkgoRecover()
<-time.After(1 * time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
cancel()
rg.StopAndWait(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no guarantee that this is called after rg.Start, right? Is this intended?

If I see correctly it doesn't really matter which one is called first as StopAndWait calls Start internally and Start is using once? (just to confirm)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, it doesn't really matter because they can both run at the same time, the waitgroup triggers. It's a best effort to try to reproduce the failure consistently, but not a guaranteed one. Although on my mac, it did trigger the race pretty consistently

}()

for i := 0; i < 200; i++ {
go func(i int) {
defer GinkgoRecover()

<-time.After(time.Duration(i) * time.Microsecond)
Expect(rg.Add(RunnableFunc(func(c context.Context) error {
<-ctx.Done()
return nil
}), func(_ context.Context) bool {
return true
})).To(SatisfyAny(
Succeed(),
Equal(errRunnableGroupStopped),
))
}(i)
}
})

It("should not turn ready if some readiness check fail", func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down