diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 54e1fed5df..76cf7abf9f 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -54,6 +54,7 @@ const ( defaultRenewDeadline = 10 * time.Second defaultRetryPeriod = 2 * time.Second defaultGracefulShutdownPeriod = 30 * time.Second + defaultHookPeriod = 15 * time.Second defaultReadinessEndpoint = "/readyz" defaultLivenessEndpoint = "/healthz" @@ -163,6 +164,13 @@ type controllerManager struct { // internalProceduresStop channel is used internally to the manager when coordinating // the proper shutdown of servers. This channel is also used for dependency injection. internalProceduresStop chan struct{} + + // prestartHooks are functions that are run immediately before calling the Start functions + // of the leader election runnables. + prestartHooks []Runnable + + // hookTimeout is the duration given to each hook to return successfully. + hookTimeout time.Duration } type hasCache interface { @@ -241,6 +249,23 @@ func (cm *controllerManager) GetHTTPClient() *http.Client { return cm.cluster.GetHTTPClient() } +// AddHook allows you to add hooks. +func (cm *controllerManager) AddHook(hook HookType, runnable Runnable) error { + cm.Lock() + defer cm.Unlock() + + if cm.started { + return fmt.Errorf("unable to add new hook because the manager has already been started") + } + + switch hook { + case HookPrestartType: + cm.prestartHooks = append(cm.prestartHooks, runnable) + } + + return nil +} + func (cm *controllerManager) GetConfig() *rest.Config { return cm.cluster.GetConfig() } @@ -580,6 +605,19 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e } func (cm *controllerManager) startLeaderElectionRunnables() error { + cm.logger.Info("Starting prestart hooks") + for _, hook := range cm.prestartHooks { + ctx, cancel := context.WithTimeout(cm.internalCtx, cm.hookTimeout) + if err := hook.Start(ctx); err != nil { + cancel() + return err + } + cancel() + } + + // All the prestart hooks have been run, clear the slice to free the underlying resources. + cm.prestartHooks = nil + return cm.runnables.LeaderElection.Start(cm.internalCtx) } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 7b247c44ad..f1f36d5f7f 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -77,6 +77,9 @@ type Manager interface { // AddReadyzCheck allows you to add Readyz checker AddReadyzCheck(name string, check healthz.Checker) error + // AddHook allows to add Runnables as hooks to modify the behavior of the controller. + AddHook(hook HookType, runnable Runnable) error + // Start starts all registered Controllers and blocks until the context is cancelled. // Returns an error if there is an error starting any controller. // @@ -336,6 +339,10 @@ type Options struct { // +optional Controller config.Controller + // HookTimeout is the duration given to each hook to return successfully. + // To use hooks without timeout, set to a negative duration, e.g. time.Duration(-1) + HookTimeout *time.Duration + // makeBroadcaster allows deferring the creation of the broadcaster to // avoid leaking goroutines if we never call Start on this manager. It also // returns whether or not this is a "owned" broadcaster, and as such should be @@ -349,6 +356,15 @@ type Options struct { newHealthProbeListener func(addr string) (net.Listener, error) } +// HookType defines hooks for use with AddHook. +type HookType int + +const ( + // HookPrestartType defines a hook that is run after leader election and immediately before + // calling Start on the runnalbes that needed leader election. + HookPrestartType HookType = iota +) + // BaseContextFunc is a function used to provide a base Context to Runnables // managed by a Manager. type BaseContextFunc func() context.Context @@ -482,6 +498,7 @@ func New(config *rest.Config, options Options) (Manager, error) { readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, + hookTimeout: *options.HookTimeout, internalProceduresStop: make(chan struct{}), leaderElectionStopped: make(chan struct{}), leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel, @@ -691,6 +708,11 @@ func setOptionsDefaults(options Options) Options { options.GracefulShutdownTimeout = &gracefulShutdownTimeout } + if options.HookTimeout == nil { + hookTimeout := defaultHookPeriod + options.HookTimeout = &hookTimeout + } + if options.Logger.GetSink() == nil { options.Logger = log.Log } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 1368ea83f0..306cdec016 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1134,6 +1134,86 @@ var _ = Describe("manger.Manager", func() { Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond)) }) + It("should run prestart hooks before calling Start on leader election runnables", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + runnableRan := make(chan struct{}) + + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + close(runnableRan) + return nil + }))) + + Expect(m.AddHook(HookPrestartType, RunnableFunc(func(ctx context.Context) error { + Expect(m.Elected()).ShouldNot(BeClosed()) + Consistently(runnableRan).ShouldNot(BeClosed()) + return nil + }))) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Elected()).ShouldNot(BeClosed()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + + <-m.Elected() + }) + + It("should run prestart hooks with timeout", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + m.(*controllerManager).hookTimeout = 1 * time.Nanosecond + + Expect(m.AddHook(HookPrestartType, RunnableFunc(func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(1 * time.Second): + return errors.New("prestart hook timeout exceeded expected") + } + }))) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + Expect(m.Start(ctx)).Should(MatchError(context.DeadlineExceeded)) + }) + + It("should not run leader election runnables if prestart hooks fail", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + runnableRan := make(chan struct{}) + + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + close(runnableRan) + return nil + }))) + + Expect(m.AddHook(HookPrestartType, RunnableFunc(func(ctx context.Context) error { + Expect(m.Elected()).ShouldNot(BeClosed()) + Consistently(runnableRan).ShouldNot(BeClosed()) + return errors.New("prestart hook failed") + }))) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + Expect(m.Elected()).ShouldNot(BeClosed()) + Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed"))) + }) } Context("with defaults", func() {