diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 7c25bd3c60..84e20a5ee3 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -160,6 +161,10 @@ type controllerManager struct { // between tries of actions. retryPeriod time.Duration + // runnableRetryBackoff, if set, instructs the manager to retry to start runnables + // if an error occurs and only fail after a certain amount of time. + runnableRetryBackoff *wait.Backoff + // waitForRunnable is holding the number of runnables currently running so that // we can wait for them to exit before quitting the manager waitForRunnable sync.WaitGroup @@ -693,7 +698,29 @@ func (cm *controllerManager) startRunnable(r Runnable) { cm.waitForRunnable.Add(1) go func() { defer cm.waitForRunnable.Done() - if err := r.Start(cm.internalCtx); err != nil { + + // If there is no retry backoff, keep old logic to return right away. + if cm.runnableRetryBackoff == nil { + if err := r.Start(cm.internalCtx); err != nil { + cm.errChan <- err + } + return + } + + // If we should wait and run into exponential backoff, call Start multiple + // times until it either suceeds, or the backoff expires. + var lastError error + if err := wait.ExponentialBackoffWithContext(cm.internalCtx, *cm.runnableRetryBackoff, func() (bool, error) { + if err := r.Start(cm.internalCtx); err != nil { + lastError = err + return false, nil + } + return true, nil + }); err != nil { + if lastError != nil { + cm.errChan <- fmt.Errorf("failed to run runnable, %s: %w", err, lastError) + return + } cm.errChan <- err } }() diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2d2733f0a6..5c6d6c6f29 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" @@ -264,6 +265,10 @@ type Options struct { // +optional Controller v1alpha1.ControllerConfigurationSpec + // RunnableRetryBackoff, if set, instructs the manager to retry to start runnables + // if an error occurs and only fail after a certain amount of time. + RunnableRetryBackoff *wait.Backoff + // makeBroadcaster allows deferring the creation of the broadcaster to // avoid leaking goroutines if we never call Start on this manager. It also // returns whether or not this is a "owned" broadcaster, and as such should be @@ -388,6 +393,7 @@ func New(config *rest.Config, options Options) (Manager, error) { internalProceduresStop: make(chan struct{}), leaderElectionStopped: make(chan struct{}), leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel, + runnableRetryBackoff: options.RunnableRetryBackoff, }, nil } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index b697751799..91a48eb19f 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" configv1alpha1 "k8s.io/component-base/config/v1alpha1" @@ -999,6 +1000,69 @@ var _ = Describe("manger.Manager", func() { <-runnableStopped }) + It("should wait for runnables if exponential backoff is set", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + m.(*controllerManager).runnableRetryBackoff = &wait.Backoff{ + Duration: 10 * time.Millisecond, + Steps: 5, + Jitter: 1.0, + } + + called := 0 + runnableStopped := make(chan struct{}) + now := time.Now() + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + called++ + if time.Now().Sub(now).Milliseconds() > 30 { + close(runnableStopped) + return nil + } + return errors.New("not yet") + }))).ToNot(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + managerStopDone := make(chan struct{}) + go func() { + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + close(managerStopDone) + }() + <-runnableStopped + <-m.(*controllerManager).elected + cancel() + + Expect(called).To(BeNumerically(">=", 1)) + }) + + It("should error when if a runnable takes too long to run and backoff is enabled", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + m.(*controllerManager).runnableRetryBackoff = &wait.Backoff{ + Duration: 10 * time.Millisecond, + Steps: 5, + Jitter: 1.0, + } + + now := time.Now() + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + if time.Now().Sub(now).Milliseconds() > 100 { + return nil + } + return errors.New("not yet") + }))).ToNot(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + err = m.Start(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("not yet")) + cancel() + }) } Context("with defaults", func() {