From 612e9b28858354c69696a91b0778f3440ce4687c Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Thu, 14 Oct 2021 08:02:13 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Refactor=20manager=20to=20avoid?= =?UTF-8?q?=20race=20conditions=20and=20provide=20clean=20shutdown?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This changeset provides a series of improvements and refactors how the manager starts and stops. During testing with Cluster API (a user of controller runtime), folks noticed that the manager which runs a series of components can deadlock itself when using conversion webhooks, or health checks, or won't cleanly shutdown and cleanup all the running controller, runnables, caches, webhooks, and http servers. In particular: - The Manager internal mutex didn't actually lock operations while the manager was in the process of starting up. The manager internal Start() implementation started off a series of goroutines internally and then waits. Concurrent operations on the manager, like AddHealthzCheck or AddReadyzCheck or AddMetricsExtraHandler modified the internals map while or after their respective servers were being configured, causing potential races or being ineffective. - Unclear ordering of the manager caused deadlock when the caches would start up. Upon startup, conversion webhooks are required when waiting for the cache initial List() call, which warms the internal caches. If a webook server or a healthz/readyz probe didn't start in time, the cache List() call fails because the webhooks would be unavailable. - Manager would say it was Elected() (note: this is used regardless if leader election is enabled or not) without waiting for all the caches to warm up, which could result in failed client calls. - Stop proceduce cancelled everything at once regardless of ordering. Previously, the context cancelled all the runnables regardless of ordering which can also cause dependencies issues. With these changes, if graceful shutdown is set, we try to cancel and wait for runnable groups to be done in a strict order before proceeding to exit the program. - Stop procedure cancelled leader election only if graceful shutdown was set. This was probably an oversight, now we're cancelling leader election regardless if graceful timeout is set or not. - The http.Server used throughout the codebase now properly sets idle and read header timeout to match the api-server. Signed-off-by: Vince Prignano --- pkg/internal/httpserver/server.go | 16 + pkg/manager/internal.go | 487 +++++++++++------------- pkg/manager/manager.go | 7 + pkg/manager/manager_test.go | 77 ++-- pkg/manager/runnable_group.go | 296 ++++++++++++++ pkg/manager/runnable_group_test.go | 182 +++++++++ pkg/webhook/server.go | 5 +- pkg/webhook/webhook_integration_test.go | 3 +- 8 files changed, 772 insertions(+), 301 deletions(-) create mode 100644 pkg/internal/httpserver/server.go create mode 100644 pkg/manager/runnable_group.go create mode 100644 pkg/manager/runnable_group_test.go diff --git a/pkg/internal/httpserver/server.go b/pkg/internal/httpserver/server.go new file mode 100644 index 0000000000..b5f91f18e0 --- /dev/null +++ b/pkg/internal/httpserver/server.go @@ -0,0 +1,16 @@ +package httpserver + +import ( + "net/http" + "time" +) + +// New returns a new server with sane defaults. +func New(handler http.Handler) *http.Server { + return &http.Server{ + Handler: handler, + MaxHeaderBytes: 1 << 20, + IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout + ReadHeaderTimeout: 32 * time.Second, + } +} diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index cd01715b4e..6fb59abdc4 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -23,6 +23,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "time" "github.com/go-logr/logr" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -40,6 +42,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/internal/httpserver" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" @@ -61,16 +64,15 @@ const ( var _ Runnable = &controllerManager{} type controllerManager struct { - // cluster holds a variety of methods to interact with a cluster. Required. - cluster cluster.Cluster + sync.Mutex + started bool - // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts. - // These Runnables are managed by lead election. - leaderElectionRunnables []Runnable + stopProcedureEngaged *int64 + errChan chan error + runnables *runnables - // nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts. - // These Runnables will not be blocked by lead election. - nonLeaderElectionRunnables []Runnable + // cluster holds a variety of methods to interact with a cluster. Required. + cluster cluster.Cluster // recorderProvider is used to generate event recorders that will be injected into Controllers // (and EventHandlers, Sources and Predicates). @@ -104,12 +106,6 @@ type controllerManager struct { // Healthz probe handler healthzHandler *healthz.Handler - mu sync.Mutex - started bool - startedLeader bool - healthzStarted bool - errChan chan error - // controllerOptions are the global controller options. controllerOptions v1alpha1.ControllerConfigurationSpec @@ -117,25 +113,20 @@ type controllerManager struct { // If none is set, it defaults to log.Log global logger. logger logr.Logger - // leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper, - // because for safety reasons we need to os.Exit() when we lose the leader election, meaning that - // it must be deferred until after gracefulShutdown is done. - leaderElectionCancel context.CancelFunc - // leaderElectionStopped is an internal channel used to signal the stopping procedure that the // LeaderElection.Run(...) function has returned and the shutdown can proceed. leaderElectionStopped chan struct{} - // stop procedure engaged. In other words, we should not add anything else to the manager - stopProcedureEngaged bool + // leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper, + // because for safety reasons we need to os.Exit() when we lose the leader election, meaning that + // it must be deferred until after gracefulShutdown is done. + leaderElectionCancel context.CancelFunc // elected is closed when this manager becomes the leader of a group of // managers, either because it won a leader election or because no leader // election was configured. elected chan struct{} - caches []hasCache - // port is the port that the webhook server serves at. port int // host is the hostname that the webhook server binds to. @@ -160,10 +151,6 @@ type controllerManager struct { // between tries of actions. retryPeriod time.Duration - // waitForRunnable is holding the number of runnables currently running so that - // we can wait for them to exit before quitting the manager - waitForRunnable sync.WaitGroup - // gracefulShutdownTimeout is the duration given to runnable to stop // before the manager actually returns on stop. gracefulShutdownTimeout time.Duration @@ -192,42 +179,17 @@ type hasCache interface { // Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { - cm.mu.Lock() - defer cm.mu.Unlock() - if cm.stopProcedureEngaged { - return errors.New("can't accept new runnable as stop procedure is already engaged") - } + cm.Lock() + defer cm.Unlock() + return cm.add(r) +} +func (cm *controllerManager) add(r Runnable) error { // Set dependencies on the object if err := cm.SetFields(r); err != nil { return err } - - var shouldStart bool - - // Add the runnable to the leader election or the non-leaderelection list - if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { - shouldStart = cm.started - cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) - } else if hasCache, ok := r.(hasCache); ok { - cm.caches = append(cm.caches, hasCache) - if cm.started { - cm.startRunnable(hasCache) - if !hasCache.GetCache().WaitForCacheSync(cm.internalCtx) { - return fmt.Errorf("could not sync cache") - } - } - } else { - shouldStart = cm.startedLeader - cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) - } - - if shouldStart { - // If already started, start the controller - cm.startRunnable(r) - } - - return nil + return cm.runnables.Add(r) } // Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10. @@ -250,13 +212,17 @@ func (cm *controllerManager) SetFields(i interface{}) error { // AddMetricsExtraHandler adds extra handler served on path to the http server that serves metrics. func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Handler) error { + cm.Lock() + defer cm.Unlock() + + if cm.started { + return fmt.Errorf("unable to add new metrics handler because metrics endpoint has already been created") + } + if path == defaultMetricsEndpoint { return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint) } - cm.mu.Lock() - defer cm.mu.Unlock() - if _, found := cm.metricsExtraHandlers[path]; found { return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path) } @@ -268,14 +234,10 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha // AddHealthzCheck allows you to add Healthz checker. func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error { - cm.mu.Lock() - defer cm.mu.Unlock() - - if cm.stopProcedureEngaged { - return errors.New("can't accept new healthCheck as stop procedure is already engaged") - } + cm.Lock() + defer cm.Unlock() - if cm.healthzStarted { + if cm.started { return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") } @@ -289,15 +251,11 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) // AddReadyzCheck allows you to add Readyz checker. func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error { - cm.mu.Lock() - defer cm.mu.Unlock() + cm.Lock() + defer cm.Unlock() - if cm.stopProcedureEngaged { - return errors.New("can't accept new ready check as stop procedure is already engaged") - } - - if cm.healthzStarted { - return fmt.Errorf("unable to add new checker because readyz endpoint has already been created") + if cm.started { + return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") } if cm.readyzHandler == nil { @@ -350,7 +308,7 @@ func (cm *controllerManager) GetWebhookServer() *webhook.Server { } } if err := cm.Add(cm.webhookServer); err != nil { - panic("unable to add webhook server to the controller manager") + panic(fmt.Sprintf("unable to add webhook server to the controller manager: %s", err)) } }) return cm.webhookServer @@ -371,77 +329,89 @@ func (cm *controllerManager) serveMetrics() { // TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics mux := http.NewServeMux() mux.Handle(defaultMetricsEndpoint, handler) - - func() { - cm.mu.Lock() - defer cm.mu.Unlock() - - for path, extraHandler := range cm.metricsExtraHandlers { - mux.Handle(path, extraHandler) - } - }() - - server := http.Server{ - Handler: mux, + for path, extraHandler := range cm.metricsExtraHandlers { + mux.Handle(path, extraHandler) } - // Run the server - cm.startRunnable(RunnableFunc(func(_ context.Context) error { - cm.logger.Info("Starting metrics server", "path", defaultMetricsEndpoint) - if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed { - return err - } - return nil - })) - // Shutdown the server when stop is closed - <-cm.internalProceduresStop - if err := server.Shutdown(cm.shutdownCtx); err != nil { - cm.errChan <- err - } + server := httpserver.New(mux) + go cm.httpServe("metrics", cm.logger.WithValues("path", defaultMetricsEndpoint), server, cm.metricsListener) } func (cm *controllerManager) serveHealthProbes() { mux := http.NewServeMux() - server := http.Server{ - Handler: mux, + server := httpserver.New(mux) + + if cm.readyzHandler != nil { + mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) + // Append '/' suffix to handle subpaths + mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) + } + if cm.healthzHandler != nil { + mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) + // Append '/' suffix to handle subpaths + mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) } - func() { - cm.mu.Lock() - defer cm.mu.Unlock() + go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener) +} - if cm.readyzHandler != nil { - mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) - // Append '/' suffix to handle subpaths - mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) - } - if cm.healthzHandler != nil { - mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) - // Append '/' suffix to handle subpaths - mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) - } +func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) { + log = log.WithValues("kind", kind, "addr", ln.Addr()) - // Run server - cm.startRunnable(RunnableFunc(func(_ context.Context) error { - if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { - return err + go func() { + log.Info("Starting server") + if err := server.Serve(ln); err != nil { + if errors.Is(err, http.ErrServerClosed) { + return } - return nil - })) - cm.healthzStarted = true + if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 { + // There might be cases where connections are still open and we try to shutdown + // but not having enough time to close the connection causes an error in Serve + // + // In that case we want to avoid returning an error to the main error channel. + log.Error(err, "error on Serve after stop has been engaged") + return + } + cm.errChan <- err + } }() - // Shutdown the server when stop is closed + // Shutdown the server when stop is closed. <-cm.internalProceduresStop if err := server.Shutdown(cm.shutdownCtx); err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + // Avoid logging context related errors. + return + } + if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 { + cm.logger.Error(err, "error on Shutdown after stop has been engaged") + return + } cm.errChan <- err } } +// Start starts the manager and waits indefinitely. +// There is only two ways to have start return: +// An error has occurred during in one of the internal operations, +// such as leader election, cache start, webhooks, and so on. +// Or, the context is cancelled. func (cm *controllerManager) Start(ctx context.Context) (err error) { - if err := cm.Add(cm.cluster); err != nil { - return fmt.Errorf("failed to add cluster to runnables: %w", err) + cm.Lock() + if cm.started { + cm.Unlock() + return errors.New("manager already started") } + var ready bool + defer func() { + // Only unlock the manager if we haven't reached + // the internal readiness condition. + if !ready { + cm.Unlock() + } + }() + + // Initialize the internal context. cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request @@ -463,40 +433,70 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { } }() - // initialize this here so that we reset the signal channel state on every start - // Everything that might write into this channel must be started in a new goroutine, - // because otherwise we might block this routine trying to write into the full channel - // and will not be able to enter the deferred cm.engageStopProcedure() which drains - // it. - cm.errChan = make(chan error) + // Add the cluster runnable. + if err := cm.add(cm.cluster); err != nil { + return fmt.Errorf("failed to add cluster to runnables: %w", err) + } // Metrics should be served whether the controller is leader or not. // (If we don't serve metrics for non-leaders, prometheus will still scrape - // the pod but will get a connection refused) + // the pod but will get a connection refused). if cm.metricsListener != nil { - go cm.serveMetrics() + cm.serveMetrics() } - // Serve health probes + // Serve health probes. if cm.healthProbeListener != nil { - go cm.serveHealthProbes() + cm.serveHealthProbes() + } + + // First start any webhook servers, which includes conversion, validation, and defaulting + // webhooks that are registered. + // + // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition + // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks + // to never start because no cache can be populated. + if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err + } } - go cm.startNonLeaderElectionRunnables() + // Start and wait for caches. + if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err + } + } - go func() { - if cm.resourceLock != nil { - err := cm.startLeaderElection() - if err != nil { - cm.errChan <- err - } - } else { - // Treat not having leader election enabled the same as being elected. - cm.startLeaderElectionRunnables() - close(cm.elected) + // Start the non-leaderelection Runnables after the cache has synced. + if err := cm.runnables.Others.Start(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err } - }() + } + // Start the leader election and all required runnables. + { + ctx, cancel := context.WithCancel(context.Background()) + cm.leaderElectionCancel = cancel + go func() { + if cm.resourceLock != nil { + if err := cm.startLeaderElection(ctx); err != nil { + cm.errChan <- err + } + } else { + // Treat not having leader election enabled the same as being elected. + if err := cm.startLeaderElectionRunnables(); err != nil { + cm.errChan <- err + } + close(cm.elected) + } + }() + } + + ready = true + cm.Unlock() select { case <-ctx.Done(): // We are done @@ -510,24 +510,31 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { // engageStopProcedure signals all runnables to stop, reads potential errors // from the errChan and waits for them to end. It must not be called more than once. func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) error { - // Populate the shutdown context. - var shutdownCancel context.CancelFunc - if cm.gracefulShutdownTimeout > 0 { - cm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout) - } else { - cm.shutdownCtx, shutdownCancel = context.WithCancel(context.Background()) + if !atomic.CompareAndSwapInt64(cm.stopProcedureEngaged, 0, 1) { + return errors.New("stop procedure already engaged") } - defer shutdownCancel() - // Cancel the internal stop channel and wait for the procedures to stop and complete. - close(cm.internalProceduresStop) - cm.internalCancel() + // Populate the shutdown context, this operation MUST be done before + // closing the internalProceduresStop channel. + // + // The shutdown context immediately expires if the gracefulShutdownTimeout is not set. + var shutdownCancel context.CancelFunc + cm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout) + defer shutdownCancel() // Start draining the errors before acquiring the lock to make sure we don't deadlock // if something that has the lock is blocked on trying to write into the unbuffered // channel after something else already wrote into it. + var closeOnce sync.Once go func() { for { + // Closing in the for loop is required to avoid race conditions between + // the closure of all internal procedures and making sure to have a reader off the error channel. + closeOnce.Do(func() { + // Cancel the internal stop channel and wait for the procedures to stop and complete. + close(cm.internalProceduresStop) + cm.internalCancel() + }) select { case err, ok := <-cm.errChan: if ok { @@ -538,26 +545,14 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e } } }() - if cm.gracefulShutdownTimeout == 0 { - return nil - } - cm.mu.Lock() - defer cm.mu.Unlock() - cm.stopProcedureEngaged = true - // we want to close this after the other runnables stop, because we don't + // We want to close this after the other runnables stop, because we don't // want things like leader election to try and emit events on a closed // channel defer cm.recorderProvider.Stop(cm.shutdownCtx) - return cm.waitForRunnableToEnd(shutdownCancel) -} - -// waitForRunnableToEnd blocks until all runnables ended or the -// tearDownTimeout was reached. In the latter case, an error is returned. -func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelFunc) (retErr error) { - // Cancel leader election only after we waited. It will os.Exit() the app for safety. defer func() { - if retErr == nil && cm.leaderElectionCancel != nil { + // Cancel leader election only after we waited. It will os.Exit() the app for safety. + if cm.resourceLock != nil { // After asking the context to be cancelled, make sure // we wait for the leader stopped channel to be closed, otherwise // we might encounter race conditions between this code @@ -568,102 +563,48 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF }() go func() { - cm.waitForRunnable.Wait() + // First stop the non-leader election runnables. + cm.logger.Info("Stopping and waiting for non leader election runnables") + cm.runnables.Others.StopAndWait(cm.shutdownCtx) + + // Stop all the leader election runnables, which includes reconcilers. + cm.logger.Info("Stopping and waiting for leader election runnables") + cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx) + + // Stop the caches before the leader election runnables, this is an important + // step to make sure that we don't race with the reconcilers by receiving more events + // from the API servers and enqueueing them. + cm.logger.Info("Stopping and waiting for caches") + cm.runnables.Caches.StopAndWait(cm.shutdownCtx) + + // Webhooks should come last, as they might be still serving some requests. + cm.logger.Info("Stopping and waiting for webhooks") + cm.runnables.Webhooks.StopAndWait(cm.shutdownCtx) + + // Proceed to close the manager and overall shutdown context. + cm.logger.Info("Wait completed, proceeding to shutdown the manager") shutdownCancel() }() <-cm.shutdownCtx.Done() if err := cm.shutdownCtx.Err(); err != nil && err != context.Canceled { - return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err) - } - return nil -} - -func (cm *controllerManager) startNonLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() - - // First start any webhook servers, which includes conversion, validation, and defaulting - // webhooks that are registered. - // - // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition - // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks - // to never start because no cache can be populated. - for _, c := range cm.nonLeaderElectionRunnables { - if _, ok := c.(*webhook.Server); ok { - cm.startRunnable(c) - } - } - - // Start and wait for caches. - cm.waitForCache(cm.internalCtx) - - // Start the non-leaderelection Runnables after the cache has synced - for _, c := range cm.nonLeaderElectionRunnables { - if _, ok := c.(*webhook.Server); ok { - continue + if errors.Is(err, context.DeadlineExceeded) { + if cm.gracefulShutdownTimeout > 0 { + return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err) + } + return nil } - - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - cm.startRunnable(c) - } -} - -func (cm *controllerManager) startLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() - - cm.waitForCache(cm.internalCtx) - - // Start the leader election Runnables after the cache has synced - for _, c := range cm.leaderElectionRunnables { - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - cm.startRunnable(c) + // For any other error, return the error. + return err } - - cm.startedLeader = true + return nil } -func (cm *controllerManager) waitForCache(ctx context.Context) { - if cm.started { - return - } - - for _, cache := range cm.caches { - cm.startRunnable(cache) - } - - // Wait for the caches to sync. - // TODO(community): Check the return value and write a test - for _, cache := range cm.caches { - cache.GetCache().WaitForCacheSync(ctx) - } - // TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse - // cm.started as check if we already started the cache so it must always become true. - // Making sure that the cache doesn't get started twice is needed to not get a "close - // of closed channel" panic - cm.started = true +func (cm *controllerManager) startLeaderElectionRunnables() error { + return cm.runnables.LeaderElection.Start(cm.internalCtx) } -func (cm *controllerManager) startLeaderElection() (err error) { - ctx, cancel := context.WithCancel(context.Background()) - cm.mu.Lock() - cm.leaderElectionCancel = cancel - cm.mu.Unlock() - - if cm.onStoppedLeading == nil { - cm.onStoppedLeading = func() { - // Make sure graceful shutdown is skipped if we lost the leader lock without - // intending to. - cm.gracefulShutdownTimeout = time.Duration(0) - // Most implementations of leader election log.Fatal() here. - // Since Start is wrapped in log.Fatal when called, we can just return - // an error here which will cause the program to exit. - cm.errChan <- errors.New("leader election lost") - } - } +func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) { l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: cm.resourceLock, LeaseDuration: cm.leaseDuration, @@ -671,10 +612,24 @@ func (cm *controllerManager) startLeaderElection() (err error) { RetryPeriod: cm.retryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(_ context.Context) { - cm.startLeaderElectionRunnables() + if err := cm.startLeaderElectionRunnables(); err != nil { + cm.errChan <- err + return + } close(cm.elected) }, - OnStoppedLeading: cm.onStoppedLeading, + OnStoppedLeading: func() { + if cm.onStoppedLeading != nil { + cm.onStoppedLeading() + } + // Make sure graceful shutdown is skipped if we lost the leader lock without + // intending to. + cm.gracefulShutdownTimeout = time.Duration(0) + // Most implementations of leader election log.Fatal() here. + // Since Start is wrapped in log.Fatal when called, we can just return + // an error here which will cause the program to exit. + cm.errChan <- errors.New("leader election lost") + }, }, ReleaseOnCancel: cm.leaderElectionReleaseOnCancel, }) @@ -694,13 +649,3 @@ func (cm *controllerManager) startLeaderElection() (err error) { func (cm *controllerManager) Elected() <-chan struct{} { return cm.elected } - -func (cm *controllerManager) startRunnable(r Runnable) { - cm.waitForRunnable.Add(1) - go func() { - defer cm.waitForRunnable.Done() - if err := r.Start(cm.internalCtx); err != nil { - cm.errChan <- err - } - }() -} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index d8c8e01c51..05edcdbb5a 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" @@ -365,8 +366,14 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, err } + errChan := make(chan error) + runnables := newRunnables(errChan) + return &controllerManager{ + stopProcedureEngaged: pointer.Int64(0), cluster: cluster, + runnables: runnables, + errChan: errChan, recorderProvider: recorderProvider, resourceLock: resourceLock, metricsListener: metricsListener, diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 2cb2c72560..0dd1cc1f5b 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -306,7 +306,7 @@ var _ = Describe("manger.Manager", func() { Expect(m.Start(ctx)).To(BeNil()) close(mgrDone) }() - <-cm.elected + <-cm.Elected() cancel() select { case <-leaderElectionDone: @@ -335,7 +335,7 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() err := m.Start(ctx) Expect(err).ToNot(BeNil()) - Expect(err.Error()).To(Equal("leader election lost")) + Expect(err.Error()).To(ContainSubstring("leader election lost")) close(mgrDone) }() cm := m.(*controllerManager) @@ -401,8 +401,8 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m1.Elected()).ShouldNot(BeClosed()) Expect(m1.Start(ctx1)).NotTo(HaveOccurred()) - Expect(m1.Elected()).Should(BeClosed()) }() + <-m1.Elected() <-c1 c2 := make(chan struct{}) @@ -435,6 +435,7 @@ var _ = Describe("manger.Manager", func() { Expect(m).To(BeNil()) Expect(err).To(MatchError(ContainSubstring("expected error"))) }) + It("should return an error if namespace not set and not running in cluster", func() { m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"}) Expect(m).To(BeNil()) @@ -609,9 +610,9 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Elected()).ShouldNot(BeClosed()) Expect(m.Start(ctx)).NotTo(HaveOccurred()) - Expect(m.Elected()).Should(BeClosed()) }() + <-m.Elected() wgRunnableStarted.Wait() }) @@ -653,7 +654,9 @@ var _ = Describe("manger.Manager", func() { } mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - mgr.caches = []hasCache{&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}} + Expect(mgr.Add( + &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}, + )).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -672,14 +675,15 @@ var _ = Describe("manger.Manager", func() { } runnableWasStarted := make(chan struct{}) - Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + runnable := RunnableFunc(func(ctx context.Context) error { defer GinkgoRecover() if !fakeCache.wasSynced { return errors.New("runnable got started before cache was synced") } close(runnableWasStarted) return nil - }))).To(Succeed()) + }) + Expect(m.Add(runnable)).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -801,8 +805,11 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) Expect(m.Add(fakeCluster)).NotTo(HaveOccurred()) - Expect(fakeCluster.informer.wasStarted).To(BeTrue()) - Expect(fakeCluster.informer.wasSynced).To(BeTrue()) + Eventually(func() bool { + fakeCluster.informer.mu.Lock() + defer fakeCluster.informer.mu.Unlock() + return fakeCluster.informer.wasStarted && fakeCluster.informer.wasSynced + }).Should(BeTrue()) }) It("should wait for runnables to stop", func() { @@ -1029,10 +1036,11 @@ var _ = Describe("manger.Manager", func() { ctx, cancel := context.WithCancel(context.Background()) managerStopDone := make(chan struct{}) go func() { + defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(managerStopDone) }() - <-m.(*controllerManager).elected + <-m.Elected() cancel() <-managerStopDone @@ -1119,6 +1127,7 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() metricsEndpoint := fmt.Sprintf("http://%s/metrics", listener.Addr().String()) resp, err := http.Get(metricsEndpoint) @@ -1137,10 +1146,12 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() endpoint := fmt.Sprintf("http://%s/should-not-exist", listener.Addr().String()) resp, err := http.Get(endpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(404)) }) @@ -1163,10 +1174,12 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() metricsEndpoint := fmt.Sprintf("http://%s/metrics", listener.Addr().String()) resp, err := http.Get(metricsEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(200)) data, err := ioutil.ReadAll(resp.Body) @@ -1204,10 +1217,12 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() endpoint := fmt.Sprintf("http://%s/debug", listener.Addr().String()) resp, err := http.Get(endpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) body, err := ioutil.ReadAll(resp.Body) @@ -1248,6 +1263,7 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() // Check the health probes started endpoint := fmt.Sprintf("http://%s", listener.Addr().String()) @@ -1261,7 +1277,7 @@ var _ = Describe("manger.Manager", func() { Eventually(func() error { _, err = http.Get(endpoint) return err - }).ShouldNot(Succeed()) + }, 10*time.Second).ShouldNot(Succeed()) }) It("should serve readiness endpoint", func() { @@ -1280,18 +1296,21 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() readinessEndpoint := fmt.Sprint("http://", listener.Addr().String(), defaultReadinessEndpoint) // Controller is not ready resp, err := http.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError)) // Controller is ready res = nil resp, err = http.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check readiness path without trailing slash without redirect @@ -1304,6 +1323,7 @@ var _ = Describe("manger.Manager", func() { } resp, err = httpClient.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check readiness path for individual check @@ -1311,6 +1331,7 @@ var _ = Describe("manger.Manager", func() { res = nil resp, err = http.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) }) @@ -1330,18 +1351,21 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() livenessEndpoint := fmt.Sprint("http://", listener.Addr().String(), defaultLivenessEndpoint) // Controller is not ready resp, err := http.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError)) // Controller is ready res = nil resp, err = http.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check liveness path without trailing slash without redirect @@ -1354,6 +1378,7 @@ var _ = Describe("manger.Manager", func() { } resp, err = httpClient.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check readiness path for individual check @@ -1361,6 +1386,7 @@ var _ = Describe("manger.Manager", func() { res = nil resp, err = http.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) }) }) @@ -1387,12 +1413,11 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() // Wait for the Manager to start Eventually(func() bool { - mgr.mu.Lock() - defer mgr.mu.Unlock() - return mgr.started + return mgr.runnables.Caches.Started() }).Should(BeTrue()) // Add another component after starting @@ -1421,9 +1446,7 @@ var _ = Describe("manger.Manager", func() { // Wait for the Manager to start Eventually(func() bool { - mgr.mu.Lock() - defer mgr.mu.Unlock() - return mgr.started + return mgr.runnables.Caches.Started() }).Should(BeTrue()) c1 := make(chan struct{}) @@ -1577,6 +1600,8 @@ var _ = Describe("manger.Manager", func() { defer close(doneCh) Expect(m.Start(ctx)).To(Succeed()) }() + <-m.Elected() + Eventually(func() *corev1.Event { evts, err := clientset.CoreV1().Events("").Search(m.GetScheme(), &ns) Expect(err).NotTo(HaveOccurred()) @@ -1765,11 +1790,12 @@ func (c *cacheProvider) Start(ctx context.Context) error { } type startSignalingInformer struct { + mu sync.Mutex + // The manager calls Start and WaitForCacheSync in // parallel, so we have to protect wasStarted with a Mutex // and block in WaitForCacheSync until it is true. - wasStartedLock sync.Mutex - wasStarted bool + wasStarted bool // was synced will be true once Start was called and // WaitForCacheSync returned, just like a real cache. wasSynced bool @@ -1777,24 +1803,23 @@ type startSignalingInformer struct { } func (c *startSignalingInformer) started() bool { - c.wasStartedLock.Lock() - defer c.wasStartedLock.Unlock() + c.mu.Lock() + defer c.mu.Unlock() return c.wasStarted } func (c *startSignalingInformer) Start(ctx context.Context) error { - c.wasStartedLock.Lock() + c.mu.Lock() c.wasStarted = true - c.wasStartedLock.Unlock() + c.mu.Unlock() return c.Cache.Start(ctx) } func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool { defer func() { - for !c.started() { - continue - } + c.mu.Lock() c.wasSynced = true + c.mu.Unlock() }() return c.Cache.WaitForCacheSync(ctx) } diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go new file mode 100644 index 0000000000..ded8aed221 --- /dev/null +++ b/pkg/manager/runnable_group.go @@ -0,0 +1,296 @@ +package manager + +import ( + "context" + "errors" + "sync" + + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +var ( + errRunnableGroupStopped = errors.New("can't accept new runnable as stop procedure is already engaged") +) + +// readyRunnable encapsulates a runnable with +// a ready check. +type readyRunnable struct { + Runnable + Check runnableCheck + signalReady bool +} + +// runnableCheck can be passed to Add() to let the runnable group determine that a +// runnable is ready. A runnable check should block until a runnable is ready, +// if the returned result is false, the runnable is considered not ready and failed. +type runnableCheck func(ctx context.Context) bool + +// runnables handles all the runnables for a manager by grouping them accordingly to their +// type (webhooks, caches etc.). +type runnables struct { + Webhooks *runnableGroup + Caches *runnableGroup + LeaderElection *runnableGroup + Others *runnableGroup +} + +// newRunnables creates a new runnables object. +func newRunnables(errChan chan error) *runnables { + return &runnables{ + Webhooks: newRunnableGroup(errChan), + Caches: newRunnableGroup(errChan), + LeaderElection: newRunnableGroup(errChan), + Others: newRunnableGroup(errChan), + } +} + +// Add adds a runnable to closest group of runnable that they belong to. +// +// Add should be able to be called before and after Start, but not after StopAndWait. +// Add should return an error when called during StopAndWait. +// The runnables added before Start are started when Start is called. +// The runnables added after Start are started directly. +func (r *runnables) Add(fn Runnable) error { + switch runnable := fn.(type) { + case hasCache: + return r.Caches.Add(fn, func(ctx context.Context) bool { + return runnable.GetCache().WaitForCacheSync(ctx) + }) + case *webhook.Server: + return r.Webhooks.Add(fn, nil) + case LeaderElectionRunnable: + if !runnable.NeedLeaderElection() { + return r.Others.Add(fn, nil) + } + return r.LeaderElection.Add(fn, nil) + default: + return r.LeaderElection.Add(fn, nil) + } +} + +// runnableGroup manages a group of runnables that are +// meant to be running together until StopAndWait is called. +// +// Runnables can be added to a group after the group has started +// but not after it's stopped or while shutting down. +type runnableGroup struct { + ctx context.Context + cancel context.CancelFunc + + start sync.Mutex + startOnce sync.Once + started bool + startQueue []*readyRunnable + startReadyCh chan *readyRunnable + + stop sync.RWMutex + stopOnce sync.Once + stopped bool + + // errChan is the error channel passed by the caller + // when the group is created. + // All errors are forwarded to this channel once they occur. + errChan chan error + + // ch is the internal channel where the runnables are read off from. + ch chan *readyRunnable + + // wg is an internal sync.WaitGroup that allows us to properly stop + // and wait for all the runnables to finish before returning. + wg *sync.WaitGroup +} + +func newRunnableGroup(errChan chan error) *runnableGroup { + r := &runnableGroup{ + startReadyCh: make(chan *readyRunnable), + errChan: errChan, + ch: make(chan *readyRunnable), + wg: new(sync.WaitGroup), + } + r.ctx, r.cancel = context.WithCancel(context.Background()) + return r +} + +// Started returns true if the group has started. +func (r *runnableGroup) Started() bool { + r.start.Lock() + defer r.start.Unlock() + return r.started +} + +// Start starts the group and waits for all +// initially registered runnables to start. +// It can only be called once, subsequent calls have no effect. +func (r *runnableGroup) Start(ctx context.Context) error { + var retErr error + + r.startOnce.Do(func() { + defer close(r.startReadyCh) + + // Start the internal reconciler. + go r.reconcile() + + // Start the group and queue up all + // the runnables that were added prior. + r.start.Lock() + r.started = true + for _, rn := range r.startQueue { + rn.signalReady = true + r.ch <- rn + } + r.start.Unlock() + + // If we don't have any queue, return. + if len(r.startQueue) == 0 { + return + } + + // Wait for all runnables to signal. + for { + select { + case <-ctx.Done(): + if err := ctx.Err(); !errors.Is(err, context.Canceled) { + retErr = err + } + case rn := <-r.startReadyCh: + for i, existing := range r.startQueue { + if existing == rn { + // Remove the item from the start queue. + r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...) + break + } + } + // We're done waiting if the queue is empty, return. + if len(r.startQueue) == 0 { + return + } + } + } + }) + + return retErr +} + +// reconcile is our main entrypoint for every runnable added +// to this group. Its primary job is to read off the internal channel +// and schedule runnables while tracking their state. +func (r *runnableGroup) reconcile() { + for runnable := range r.ch { + // Handle stop. + // If the shutdown has been called we want to avoid + // adding new goroutines to the WaitGroup because Wait() + // panics if Add() is called after it. + { + r.stop.RLock() + if r.stopped { + // Drop any runnables if we're stopped. + r.errChan <- errRunnableGroupStopped + r.stop.RUnlock() + continue + } + + // Why is this here? + // When StopAndWait is called, if a runnable is in the process + // of being added, we could end up in a situation where + // the WaitGroup is incremented while StopAndWait has called Wait(), + // which would result in a panic. + r.wg.Add(1) + r.stop.RUnlock() + } + + // Start the runnable. + go func(rn *readyRunnable) { + go func() { + if rn.Check(r.ctx) { + if rn.signalReady { + r.startReadyCh <- rn + } + } + }() + + // If we return, the runnable ended cleanly + // or returned an error to the channel. + // + // We should always decrement the WaitGroup here. + defer r.wg.Done() + + // Start the runnable. + if err := rn.Start(r.ctx); err != nil { + r.errChan <- err + } + }(runnable) + } +} + +// Add should be able to be called before and after Start, but not after StopAndWait. +// Add should return an error when called during StopAndWait. +func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error { + r.stop.RLock() + if r.stopped { + r.stop.RUnlock() + return errRunnableGroupStopped + } + r.stop.RUnlock() + + if ready == nil { + ready = func(_ context.Context) bool { return true } + } + + readyRunnable := &readyRunnable{ + Runnable: rn, + Check: ready, + } + + // Handle start. + // If the overall runnable group isn't started yet + // we want to buffer the runnables and let Start() + // queue them up again later. + { + r.start.Lock() + + // Check if we're already started. + if !r.started { + // Store the runnable in the internal if not. + r.startQueue = append(r.startQueue, readyRunnable) + r.start.Unlock() + return nil + } + r.start.Unlock() + } + + // Enqueue the runnable. + r.ch <- readyRunnable + return nil +} + +// StopAndWait waits for all the runnables to finish before returning. +func (r *runnableGroup) StopAndWait(ctx context.Context) { + r.stopOnce.Do(func() { + // Close the reconciler channel once we're done. + defer close(r.ch) + + _ = r.Start(ctx) + r.stop.Lock() + // Store the stopped variable so we don't accept any new + // runnables for the time being. + r.stopped = true + r.stop.Unlock() + + // Cancel the internal channel. + r.cancel() + + done := make(chan struct{}) + go func() { + defer close(done) + // Wait for all the runnables to finish. + r.wg.Wait() + }() + + select { + case <-done: + // We're done, exit. + case <-ctx.Done(): + // Calling context has expired, exit. + } + }) +} diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go new file mode 100644 index 0000000000..57e0ad6387 --- /dev/null +++ b/pkg/manager/runnable_group_test.go @@ -0,0 +1,182 @@ +package manager + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +var _ = Describe("runnables", func() { + errCh := make(chan error) + + It("should be able to create a new runnables object", func() { + Expect(newRunnables(errCh)).ToNot(BeNil()) + }) + + It("should add caches to the appropriate group", func() { + cache := &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}} + r := newRunnables(errCh) + Expect(r.Add(cache)).To(Succeed()) + Expect(r.Caches.startQueue).To(HaveLen(1)) + }) + + It("should add webhooks to the appropriate group", func() { + webhook := &webhook.Server{} + r := newRunnables(errCh) + Expect(r.Add(webhook)).To(Succeed()) + Expect(r.Webhooks.startQueue).To(HaveLen(1)) + }) + + It("should add any runnable to the leader election group", func() { + err := errors.New("runnable func") + runnable := RunnableFunc(func(c context.Context) error { + return err + }) + + r := newRunnables(errCh) + Expect(r.Add(runnable)).To(Succeed()) + Expect(r.LeaderElection.startQueue).To(HaveLen(1)) + }) +}) + +var _ = Describe("runnableGroup", func() { + errCh := make(chan error) + + It("should be able to add new runnables before it starts", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + + Expect(rg.Started()).To(BeFalse()) + }) + + It("should be able to add new runnables before and after start", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + Expect(rg.Start(ctx)).To(Succeed()) + Expect(rg.Started()).To(BeTrue()) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + }) + + It("should be able to add new runnables before and after start concurrently", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + defer GinkgoRecover() + <-time.After(50 * time.Millisecond) + Expect(rg.Start(ctx)).To(Succeed()) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + <-time.After(time.Duration(i) * 10 * time.Millisecond) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + }(i) + } + }) + + It("should be able to close the group and wait for all runnables to finish", func() { + ctx, cancel := context.WithCancel(context.Background()) + + exited := pointer.Int64(0) + rg := newRunnableGroup(errCh) + for i := 0; i < 10; i++ { + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + defer atomic.AddInt64(exited, 1) + <-ctx.Done() + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return nil + }), nil)).To(Succeed()) + } + Expect(rg.Start(ctx)).To(Succeed()) + + // Cancel the context, asking the runnables to exit. + cancel() + rg.StopAndWait(context.Background()) + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + return nil + }), nil)).ToNot(Succeed()) + + Expect(atomic.LoadInt64(exited)).To(BeNumerically("==", 10)) + }) + + It("should be able to wait for all runnables to be ready at different intervals", func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + defer GinkgoRecover() + <-time.After(50 * time.Millisecond) + Expect(rg.Start(ctx)).To(Succeed()) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), func(_ context.Context) bool { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return true + })).To(Succeed()) + }(i) + } + }) + + It("should not turn ready if some readiness check fail", func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + defer GinkgoRecover() + <-time.After(50 * time.Millisecond) + Expect(rg.Start(ctx)).To(Succeed()) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), func(_ context.Context) bool { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return i%2 == 0 // Return false readiness all uneven indexes. + })).To(Succeed()) + }(i) + } + }) +}) diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index 1db38113f7..364d0f902e 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -34,6 +34,7 @@ import ( kscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/internal/httpserver" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics" ) @@ -261,9 +262,7 @@ func (s *Server) Start(ctx context.Context) error { log.Info("Serving webhook server", "host", s.Host, "port", s.Port) - srv := &http.Server{ - Handler: s.WebhookMux, - } + srv := httpserver.New(s.WebhookMux) idleConnsClosed := make(chan struct{}) go func() { diff --git a/pkg/webhook/webhook_integration_test.go b/pkg/webhook/webhook_integration_test.go index 0b9754ef40..029a503b4b 100644 --- a/pkg/webhook/webhook_integration_test.go +++ b/pkg/webhook/webhook_integration_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/internal/httpserver" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/webhook" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -185,7 +186,7 @@ var _ = Describe("Webhook", func() { http.Handle("/failing", hook) By("running the http server") - srv := &http.Server{} + srv := httpserver.New(nil) go func() { idleConnsClosed := make(chan struct{}) go func() {