diff --git a/pkg/config/v1alpha1/types.go b/pkg/config/v1alpha1/types.go index f2226278c6..bc29d80ae7 100644 --- a/pkg/config/v1alpha1/types.go +++ b/pkg/config/v1alpha1/types.go @@ -55,6 +55,10 @@ type ControllerManagerConfigurationSpec struct { // The graceful shutdown is skipped for safety reasons in case the leader election lease is lost. GracefulShutdownTimeout *metav1.Duration `json:"gracefulShutDown,omitempty"` + // PrestartTimeout is the duration given to each prestart hook before they return successfully. + // To use prestart hooks without a timeout, set to a negative duration, e.g. time.Duration(-1) + PrestartTimeout *metav1.Duration `json:"prestartTimeout,omitempty"` + // Controller contains global configuration options for controllers // registered within this manager. // +optional diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 3e79f50bbd..496667b441 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -56,6 +56,7 @@ const ( defaultRenewDeadline = 10 * time.Second defaultRetryPeriod = 2 * time.Second defaultGracefulShutdownPeriod = 30 * time.Second + defaultPrestartPeriod = 15 * time.Second defaultReadinessEndpoint = "/readyz" defaultLivenessEndpoint = "/healthz" @@ -176,6 +177,13 @@ type controllerManager struct { // internalProceduresStop channel is used internally to the manager when coordinating // the proper shutdown of servers. This channel is also used for dependency injection. internalProceduresStop chan struct{} + + // prestartHooks are functions that are run immediately before calling the Start functions + // of the leader election runnables. + prestartHooks []func(ctx context.Context) error + + // prestartTimeout is the duration given to each prestart hook to return successfully. + prestartTimeout time.Duration } type hasCache interface { @@ -272,6 +280,19 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) return nil } +// AddPrestartHook allows you to add prestart hooks. +func (cm *controllerManager) AddPrestartHook(hook func(ctx context.Context) error) error { + cm.Lock() + defer cm.Unlock() + + if cm.started { + return fmt.Errorf("unable to add new prestart hook because the manager has already been started") + } + + cm.prestartHooks = append(cm.prestartHooks, hook) + return nil +} + func (cm *controllerManager) GetConfig() *rest.Config { return cm.cluster.GetConfig() } @@ -611,6 +632,18 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e } func (cm *controllerManager) startLeaderElectionRunnables() error { + cm.logger.Info("Starting prestart hooks") + for _, hook := range cm.prestartHooks { + ctx, cancel := context.WithTimeout(cm.internalCtx, cm.prestartTimeout) + if err := hook(ctx); err != nil { + return err + } + cancel() + } + + // All the prestart hooks have been run, clear the slice to free the underlying resources. + cm.prestartHooks = nil + return cm.runnables.LeaderElection.Start(cm.internalCtx) } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2facb1c915..cf2efd3a20 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -79,6 +79,12 @@ type Manager interface { // AddReadyzCheck allows you to add Readyz checker AddReadyzCheck(name string, check healthz.Checker) error + // AddPrestartHook allows you to add a hook that runs after leader election and immediately + // before starting controllers needing leader election. Prestart hooks block execution of + // leader election controllers until all return nil error. The manager is stopped on non-nil + // errors. + AddPrestartHook(func(ctx context.Context) error) error + // Start starts all registered Controllers and blocks until the context is cancelled. // Returns an error if there is an error starting any controller. // @@ -299,6 +305,10 @@ type Options struct { // +optional Controller v1alpha1.ControllerConfigurationSpec + // PrestartTimeout is the duration given to each prestart hook to return successfully. + // To use prestart hooks without timeout, set to a negative duration, e.g. time.Duration(-1) + PrestartTimeout *time.Duration + // makeBroadcaster allows deferring the creation of the broadcaster to // avoid leaking goroutines if we never call Start on this manager. It also // returns whether or not this is a "owned" broadcaster, and as such should be @@ -447,6 +457,7 @@ func New(config *rest.Config, options Options) (Manager, error) { readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, + prestartTimeout: *options.PrestartTimeout, internalProceduresStop: make(chan struct{}), leaderElectionStopped: make(chan struct{}), leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel, @@ -507,6 +518,10 @@ func (o Options) AndFrom(loader config.ControllerManagerConfiguration) (Options, o.CertDir = newObj.Webhook.CertDir } + if o.PrestartTimeout == nil && newObj.PrestartTimeout != nil { + o.PrestartTimeout = &newObj.PrestartTimeout.Duration + } + if newObj.Controller != nil { if o.Controller.CacheSyncTimeout == nil && newObj.Controller.CacheSyncTimeout != nil { o.Controller.CacheSyncTimeout = newObj.Controller.CacheSyncTimeout @@ -644,6 +659,11 @@ func setOptionsDefaults(options Options) Options { options.GracefulShutdownTimeout = &gracefulShutdownTimeout } + if options.PrestartTimeout == nil { + prestartTimeout := defaultPrestartPeriod + options.PrestartTimeout = &prestartTimeout + } + if options.Logger.GetSink() == nil { options.Logger = log.Log } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index f3b8443a95..a2f6c9daf6 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1076,6 +1076,63 @@ var _ = Describe("manger.Manager", func() { <-runnableStopped }) + It("should run prestart hooks before calling Start on leader election runnables", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + runnableRan := make(chan struct{}) + + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + close(runnableRan) + return nil + }))) + + Expect(m.AddPrestartHook(func(ctx context.Context) error { + Expect(m.Elected()).ShouldNot(BeClosed()) + Consistently(runnableRan).ShouldNot(BeClosed()) + return nil + })) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Elected()).ShouldNot(BeClosed()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + + <-m.Elected() + }) + + It("should not run leader election runnables if prestart hooks fail", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + runnableRan := make(chan struct{}) + + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + close(runnableRan) + return nil + }))) + + Expect(m.AddPrestartHook(func(ctx context.Context) error { + Expect(m.Elected()).ShouldNot(BeClosed()) + Consistently(runnableRan).ShouldNot(BeClosed()) + return errors.New("prestart hook failed") + })) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + Expect(m.Elected()).ShouldNot(BeClosed()) + Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed"))) + }) } Context("with defaults", func() {