diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index 96566f5df1..6060910485 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -263,6 +263,15 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error { r.start.Unlock() } + // Recheck if we're stopped and hold the readlock, given that the stop and start can be called + // at the same time, we can end up in a situation where the runnable is added + // after the group is stopped and the channel is closed. + r.stop.RLock() + defer r.stop.RUnlock() + if r.stopped { + return errRunnableGroupStopped + } + // Enqueue the runnable. r.ch <- readyRunnable return nil @@ -272,7 +281,11 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error { func (r *runnableGroup) StopAndWait(ctx context.Context) { r.stopOnce.Do(func() { // Close the reconciler channel once we're done. - defer close(r.ch) + defer func() { + r.stop.Lock() + close(r.ch) + r.stop.Unlock() + }() _ = r.Start(ctx) r.stop.Lock() diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go index 9a55c4de9e..34d76ed0dc 100644 --- a/pkg/manager/runnable_group_test.go +++ b/pkg/manager/runnable_group_test.go @@ -161,6 +161,42 @@ var _ = Describe("runnableGroup", func() { } }) + It("should be able to handle adding runnables while stopping", func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + rg := newRunnableGroup(defaultBaseContext, errCh) + + go func() { + defer GinkgoRecover() + <-time.After(1 * time.Millisecond) + Expect(rg.Start(ctx)).To(Succeed()) + }() + go func() { + defer GinkgoRecover() + <-time.After(1 * time.Millisecond) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + rg.StopAndWait(ctx) + }() + + for i := 0; i < 200; i++ { + go func(i int) { + defer GinkgoRecover() + + <-time.After(time.Duration(i) * time.Microsecond) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), func(_ context.Context) bool { + return true + })).To(SatisfyAny( + Succeed(), + Equal(errRunnableGroupStopped), + )) + }(i) + } + }) + It("should not turn ready if some readiness check fail", func() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel()