From fea003120c5a7bd7e2095fff32f8ce505c93b823 Mon Sep 17 00:00:00 2001 From: Terin Stock Date: Mon, 14 Nov 2022 15:51:09 +0100 Subject: [PATCH] feat(manager): add prestart hook support When implementing a controller that uses leader election, there maybe be work that needs to be done after winning the election but before processing enqueued requests. For example, a controller may need to build up an internal mapping of the current state of the cluster before it can begin reconciling. This changeset adds support for adding prestart hooks to controller-runtime's manager implementation. This hook runs after the manager has been elected leader, immediately before the leader election controllers are started. Related #607 --- pkg/config/v1alpha1/types.go | 4 +++ pkg/manager/internal.go | 33 +++++++++++++++++++++ pkg/manager/manager.go | 20 +++++++++++++ pkg/manager/manager_test.go | 57 ++++++++++++++++++++++++++++++++++++ 4 files changed, 114 insertions(+) diff --git a/pkg/config/v1alpha1/types.go b/pkg/config/v1alpha1/types.go index f2226278c6..bc29d80ae7 100644 --- a/pkg/config/v1alpha1/types.go +++ b/pkg/config/v1alpha1/types.go @@ -55,6 +55,10 @@ type ControllerManagerConfigurationSpec struct { // The graceful shutdown is skipped for safety reasons in case the leader election lease is lost. GracefulShutdownTimeout *metav1.Duration `json:"gracefulShutDown,omitempty"` + // PrestartTimeout is the duration given to each prestart hook before they return successfully. + // To use prestart hooks without a timeout, set to a negative duration, e.g. time.Duration(-1) + PrestartTimeout *metav1.Duration `json:"prestartTimeout,omitempty"` + // Controller contains global configuration options for controllers // registered within this manager. // +optional diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 3e79f50bbd..496667b441 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -56,6 +56,7 @@ const ( defaultRenewDeadline = 10 * time.Second defaultRetryPeriod = 2 * time.Second defaultGracefulShutdownPeriod = 30 * time.Second + defaultPrestartPeriod = 15 * time.Second defaultReadinessEndpoint = "/readyz" defaultLivenessEndpoint = "/healthz" @@ -176,6 +177,13 @@ type controllerManager struct { // internalProceduresStop channel is used internally to the manager when coordinating // the proper shutdown of servers. This channel is also used for dependency injection. internalProceduresStop chan struct{} + + // prestartHooks are functions that are run immediately before calling the Start functions + // of the leader election runnables. + prestartHooks []func(ctx context.Context) error + + // prestartTimeout is the duration given to each prestart hook to return successfully. + prestartTimeout time.Duration } type hasCache interface { @@ -272,6 +280,19 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) return nil } +// AddPrestartHook allows you to add prestart hooks. +func (cm *controllerManager) AddPrestartHook(hook func(ctx context.Context) error) error { + cm.Lock() + defer cm.Unlock() + + if cm.started { + return fmt.Errorf("unable to add new prestart hook because the manager has already been started") + } + + cm.prestartHooks = append(cm.prestartHooks, hook) + return nil +} + func (cm *controllerManager) GetConfig() *rest.Config { return cm.cluster.GetConfig() } @@ -611,6 +632,18 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e } func (cm *controllerManager) startLeaderElectionRunnables() error { + cm.logger.Info("Starting prestart hooks") + for _, hook := range cm.prestartHooks { + ctx, cancel := context.WithTimeout(cm.internalCtx, cm.prestartTimeout) + if err := hook(ctx); err != nil { + return err + } + cancel() + } + + // All the prestart hooks have been run, clear the slice to free the underlying resources. + cm.prestartHooks = nil + return cm.runnables.LeaderElection.Start(cm.internalCtx) } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2facb1c915..cf2efd3a20 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -79,6 +79,12 @@ type Manager interface { // AddReadyzCheck allows you to add Readyz checker AddReadyzCheck(name string, check healthz.Checker) error + // AddPrestartHook allows you to add a hook that runs after leader election and immediately + // before starting controllers needing leader election. Prestart hooks block execution of + // leader election controllers until all return nil error. The manager is stopped on non-nil + // errors. + AddPrestartHook(func(ctx context.Context) error) error + // Start starts all registered Controllers and blocks until the context is cancelled. // Returns an error if there is an error starting any controller. // @@ -299,6 +305,10 @@ type Options struct { // +optional Controller v1alpha1.ControllerConfigurationSpec + // PrestartTimeout is the duration given to each prestart hook to return successfully. + // To use prestart hooks without timeout, set to a negative duration, e.g. time.Duration(-1) + PrestartTimeout *time.Duration + // makeBroadcaster allows deferring the creation of the broadcaster to // avoid leaking goroutines if we never call Start on this manager. It also // returns whether or not this is a "owned" broadcaster, and as such should be @@ -447,6 +457,7 @@ func New(config *rest.Config, options Options) (Manager, error) { readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, + prestartTimeout: *options.PrestartTimeout, internalProceduresStop: make(chan struct{}), leaderElectionStopped: make(chan struct{}), leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel, @@ -507,6 +518,10 @@ func (o Options) AndFrom(loader config.ControllerManagerConfiguration) (Options, o.CertDir = newObj.Webhook.CertDir } + if o.PrestartTimeout == nil && newObj.PrestartTimeout != nil { + o.PrestartTimeout = &newObj.PrestartTimeout.Duration + } + if newObj.Controller != nil { if o.Controller.CacheSyncTimeout == nil && newObj.Controller.CacheSyncTimeout != nil { o.Controller.CacheSyncTimeout = newObj.Controller.CacheSyncTimeout @@ -644,6 +659,11 @@ func setOptionsDefaults(options Options) Options { options.GracefulShutdownTimeout = &gracefulShutdownTimeout } + if options.PrestartTimeout == nil { + prestartTimeout := defaultPrestartPeriod + options.PrestartTimeout = &prestartTimeout + } + if options.Logger.GetSink() == nil { options.Logger = log.Log } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index f3b8443a95..a2f6c9daf6 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1076,6 +1076,63 @@ var _ = Describe("manger.Manager", func() { <-runnableStopped }) + It("should run prestart hooks before calling Start on leader election runnables", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + runnableRan := make(chan struct{}) + + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + close(runnableRan) + return nil + }))) + + Expect(m.AddPrestartHook(func(ctx context.Context) error { + Expect(m.Elected()).ShouldNot(BeClosed()) + Consistently(runnableRan).ShouldNot(BeClosed()) + return nil + })) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Elected()).ShouldNot(BeClosed()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + + <-m.Elected() + }) + + It("should not run leader election runnables if prestart hooks fail", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + runnableRan := make(chan struct{}) + + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + close(runnableRan) + return nil + }))) + + Expect(m.AddPrestartHook(func(ctx context.Context) error { + Expect(m.Elected()).ShouldNot(BeClosed()) + Consistently(runnableRan).ShouldNot(BeClosed()) + return errors.New("prestart hook failed") + })) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + Expect(m.Elected()).ShouldNot(BeClosed()) + Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed"))) + }) } Context("with defaults", func() {