Skip to content

Commit

Permalink
feat(manager): add prestart hook support
Browse files Browse the repository at this point in the history
When implementing a controller that uses leader election, there maybe be
work that needs to be done after winning the election but before
processing enqueued requests. For example, a controller may need to
build up an internal mapping of the current state of the cluster before
it can begin reconciling.

This changeset adds support for adding prestart hooks to
controller-runtime's manager implementation. This hook runs after the
manager has been elected leader, immediately before the leader election
controllers are started.

Related #607
  • Loading branch information
terinjokes committed Dec 27, 2022
1 parent 3c4deba commit fea0031
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/config/v1alpha1/types.go
Expand Up @@ -55,6 +55,10 @@ type ControllerManagerConfigurationSpec struct {
// The graceful shutdown is skipped for safety reasons in case the leader election lease is lost.
GracefulShutdownTimeout *metav1.Duration `json:"gracefulShutDown,omitempty"`

// PrestartTimeout is the duration given to each prestart hook before they return successfully.
// To use prestart hooks without a timeout, set to a negative duration, e.g. time.Duration(-1)
PrestartTimeout *metav1.Duration `json:"prestartTimeout,omitempty"`

// Controller contains global configuration options for controllers
// registered within this manager.
// +optional
Expand Down
33 changes: 33 additions & 0 deletions pkg/manager/internal.go
Expand Up @@ -56,6 +56,7 @@ const (
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second
defaultGracefulShutdownPeriod = 30 * time.Second
defaultPrestartPeriod = 15 * time.Second

defaultReadinessEndpoint = "/readyz"
defaultLivenessEndpoint = "/healthz"
Expand Down Expand Up @@ -176,6 +177,13 @@ type controllerManager struct {
// internalProceduresStop channel is used internally to the manager when coordinating
// the proper shutdown of servers. This channel is also used for dependency injection.
internalProceduresStop chan struct{}

// prestartHooks are functions that are run immediately before calling the Start functions
// of the leader election runnables.
prestartHooks []func(ctx context.Context) error

// prestartTimeout is the duration given to each prestart hook to return successfully.
prestartTimeout time.Duration
}

type hasCache interface {
Expand Down Expand Up @@ -272,6 +280,19 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker)
return nil
}

// AddPrestartHook allows you to add prestart hooks.
func (cm *controllerManager) AddPrestartHook(hook func(ctx context.Context) error) error {
cm.Lock()
defer cm.Unlock()

if cm.started {
return fmt.Errorf("unable to add new prestart hook because the manager has already been started")
}

cm.prestartHooks = append(cm.prestartHooks, hook)
return nil
}

func (cm *controllerManager) GetConfig() *rest.Config {
return cm.cluster.GetConfig()
}
Expand Down Expand Up @@ -611,6 +632,18 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
}

func (cm *controllerManager) startLeaderElectionRunnables() error {
cm.logger.Info("Starting prestart hooks")
for _, hook := range cm.prestartHooks {
ctx, cancel := context.WithTimeout(cm.internalCtx, cm.prestartTimeout)
if err := hook(ctx); err != nil {
return err
}
cancel()
}

// All the prestart hooks have been run, clear the slice to free the underlying resources.
cm.prestartHooks = nil

return cm.runnables.LeaderElection.Start(cm.internalCtx)
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/manager/manager.go
Expand Up @@ -79,6 +79,12 @@ type Manager interface {
// AddReadyzCheck allows you to add Readyz checker
AddReadyzCheck(name string, check healthz.Checker) error

// AddPrestartHook allows you to add a hook that runs after leader election and immediately
// before starting controllers needing leader election. Prestart hooks block execution of
// leader election controllers until all return nil error. The manager is stopped on non-nil
// errors.
AddPrestartHook(func(ctx context.Context) error) error

// Start starts all registered Controllers and blocks until the context is cancelled.
// Returns an error if there is an error starting any controller.
//
Expand Down Expand Up @@ -299,6 +305,10 @@ type Options struct {
// +optional
Controller v1alpha1.ControllerConfigurationSpec

// PrestartTimeout is the duration given to each prestart hook to return successfully.
// To use prestart hooks without timeout, set to a negative duration, e.g. time.Duration(-1)
PrestartTimeout *time.Duration

// makeBroadcaster allows deferring the creation of the broadcaster to
// avoid leaking goroutines if we never call Start on this manager. It also
// returns whether or not this is a "owned" broadcaster, and as such should be
Expand Down Expand Up @@ -447,6 +457,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
prestartTimeout: *options.PrestartTimeout,
internalProceduresStop: make(chan struct{}),
leaderElectionStopped: make(chan struct{}),
leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
Expand Down Expand Up @@ -507,6 +518,10 @@ func (o Options) AndFrom(loader config.ControllerManagerConfiguration) (Options,
o.CertDir = newObj.Webhook.CertDir
}

if o.PrestartTimeout == nil && newObj.PrestartTimeout != nil {
o.PrestartTimeout = &newObj.PrestartTimeout.Duration
}

if newObj.Controller != nil {
if o.Controller.CacheSyncTimeout == nil && newObj.Controller.CacheSyncTimeout != nil {
o.Controller.CacheSyncTimeout = newObj.Controller.CacheSyncTimeout
Expand Down Expand Up @@ -644,6 +659,11 @@ func setOptionsDefaults(options Options) Options {
options.GracefulShutdownTimeout = &gracefulShutdownTimeout
}

if options.PrestartTimeout == nil {
prestartTimeout := defaultPrestartPeriod
options.PrestartTimeout = &prestartTimeout
}

if options.Logger.GetSink() == nil {
options.Logger = log.Log
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/manager/manager_test.go
Expand Up @@ -1076,6 +1076,63 @@ var _ = Describe("manger.Manager", func() {
<-runnableStopped
})

It("should run prestart hooks before calling Start on leader election runnables", func() {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

runnableRan := make(chan struct{})

Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
close(runnableRan)
return nil
})))

Expect(m.AddPrestartHook(func(ctx context.Context) error {
Expect(m.Elected()).ShouldNot(BeClosed())
Consistently(runnableRan).ShouldNot(BeClosed())
return nil
}))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Elected()).ShouldNot(BeClosed())
Expect(m.Start(ctx)).NotTo(HaveOccurred())
}()

<-m.Elected()
})

It("should not run leader election runnables if prestart hooks fail", func() {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

runnableRan := make(chan struct{})

Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
close(runnableRan)
return nil
})))

Expect(m.AddPrestartHook(func(ctx context.Context) error {
Expect(m.Elected()).ShouldNot(BeClosed())
Consistently(runnableRan).ShouldNot(BeClosed())
return errors.New("prestart hook failed")
}))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expect(m.Elected()).ShouldNot(BeClosed())
Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed")))
})
}

Context("with defaults", func() {
Expand Down

0 comments on commit fea0031

Please sign in to comment.