From 851eccd33abcc0aa5071ead021e7272e3a6fdd7c Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Wed, 29 Sep 2021 10:55:09 -0700 Subject: [PATCH] Manager should support retrying to start runnables with backoff This changeset adds the ability for a Manager to not fail immediately if a wait.Backoff parameter is given as RunnableRetryBackoff in Options. Currently, if a runnable fails to run the Start operation is never retried which could cause the manager and all webhooks to stop and the deployment to go into CrashLoopBackoff. Given the eventual consistency of controllers and managers cooperating with other controllers or the api-server, allow some sort of backoff by trying to start runnables a number of times before giving up. Signed-off-by: Vince Prignano --- pkg/manager/internal.go | 29 ++++++++++++++++- pkg/manager/manager.go | 6 ++++ pkg/manager/manager_test.go | 64 +++++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 1 deletion(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 7c25bd3c60..84e20a5ee3 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -160,6 +161,10 @@ type controllerManager struct { // between tries of actions. retryPeriod time.Duration + // runnableRetryBackoff, if set, instructs the manager to retry to start runnables + // if an error occurs and only fail after a certain amount of time. + runnableRetryBackoff *wait.Backoff + // waitForRunnable is holding the number of runnables currently running so that // we can wait for them to exit before quitting the manager waitForRunnable sync.WaitGroup @@ -693,7 +698,29 @@ func (cm *controllerManager) startRunnable(r Runnable) { cm.waitForRunnable.Add(1) go func() { defer cm.waitForRunnable.Done() - if err := r.Start(cm.internalCtx); err != nil { + + // If there is no retry backoff, keep old logic to return right away. + if cm.runnableRetryBackoff == nil { + if err := r.Start(cm.internalCtx); err != nil { + cm.errChan <- err + } + return + } + + // If we should wait and run into exponential backoff, call Start multiple + // times until it either suceeds, or the backoff expires. + var lastError error + if err := wait.ExponentialBackoffWithContext(cm.internalCtx, *cm.runnableRetryBackoff, func() (bool, error) { + if err := r.Start(cm.internalCtx); err != nil { + lastError = err + return false, nil + } + return true, nil + }); err != nil { + if lastError != nil { + cm.errChan <- fmt.Errorf("failed to run runnable, %s: %w", err, lastError) + return + } cm.errChan <- err } }() diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2d2733f0a6..5c6d6c6f29 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" @@ -264,6 +265,10 @@ type Options struct { // +optional Controller v1alpha1.ControllerConfigurationSpec + // RunnableRetryBackoff, if set, instructs the manager to retry to start runnables + // if an error occurs and only fail after a certain amount of time. + RunnableRetryBackoff *wait.Backoff + // makeBroadcaster allows deferring the creation of the broadcaster to // avoid leaking goroutines if we never call Start on this manager. It also // returns whether or not this is a "owned" broadcaster, and as such should be @@ -388,6 +393,7 @@ func New(config *rest.Config, options Options) (Manager, error) { internalProceduresStop: make(chan struct{}), leaderElectionStopped: make(chan struct{}), leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel, + runnableRetryBackoff: options.RunnableRetryBackoff, }, nil } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index b697751799..91a48eb19f 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" configv1alpha1 "k8s.io/component-base/config/v1alpha1" @@ -999,6 +1000,69 @@ var _ = Describe("manger.Manager", func() { <-runnableStopped }) + It("should wait for runnables if exponential backoff is set", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + m.(*controllerManager).runnableRetryBackoff = &wait.Backoff{ + Duration: 10 * time.Millisecond, + Steps: 5, + Jitter: 1.0, + } + + called := 0 + runnableStopped := make(chan struct{}) + now := time.Now() + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + called++ + if time.Now().Sub(now).Milliseconds() > 30 { + close(runnableStopped) + return nil + } + return errors.New("not yet") + }))).ToNot(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + managerStopDone := make(chan struct{}) + go func() { + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + close(managerStopDone) + }() + <-runnableStopped + <-m.(*controllerManager).elected + cancel() + + Expect(called).To(BeNumerically(">=", 1)) + }) + + It("should error when if a runnable takes too long to run and backoff is enabled", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + m.(*controllerManager).runnableRetryBackoff = &wait.Backoff{ + Duration: 10 * time.Millisecond, + Steps: 5, + Jitter: 1.0, + } + + now := time.Now() + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + if time.Now().Sub(now).Milliseconds() > 100 { + return nil + } + return errors.New("not yet") + }))).ToNot(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + err = m.Start(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("not yet")) + cancel() + }) } Context("with defaults", func() {