Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add controller workqueue option #2767

Merged
merged 1 commit into from Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 23 additions & 6 deletions pkg/controller/controller.go
Expand Up @@ -59,6 +59,18 @@ type Options struct {
// The overall is a token bucket and the per-item is exponential.
RateLimiter ratelimiter.RateLimiter

// NewQueue constructs the queue for this controller once the controller is ready to start.
// With NewQueue a custom queue implementation can be used, e.g. a priority queue to prioritize with which
// priority/order objects are reconciled (e.g. to reconcile objects with changes first).
// This is a func because the standard Kubernetes work queues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
// The NewQueue func gets the controller name and the RateLimiter option (defaulted if necessary) passed in.
// NewQueue defaults to NewRateLimitingQueueWithConfig.
//
// NOTE: LOW LEVEL PRIMITIVE!
// Only use a custom NewQueue if you know what you are doing.
NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface

// LogConstructor is used to construct a logger used for this controller and passed
// to each reconciliation via the context field.
LogConstructor func(request *reconcile.Request) logr.Logger
Expand Down Expand Up @@ -147,6 +159,14 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
}

if options.NewQueue == nil {
options.NewQueue = func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface {
return workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{
Name: controllerName,
})
}
}

if options.RecoverPanic == nil {
options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic
}
Expand All @@ -157,12 +177,9 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller

// Create controller with dependencies set
return &controller.Controller{
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{
Name: name,
})
},
Do: options.Reconciler,
RateLimiter: options.RateLimiter,
NewQueue: options.NewQueue,
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
Name: name,
Expand Down
44 changes: 44 additions & 0 deletions pkg/controller/controller_test.go
Expand Up @@ -24,6 +24,7 @@ import (
. "github.com/onsi/gomega"
"go.uber.org/goleak"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/config"
Expand All @@ -32,6 +33,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand Down Expand Up @@ -133,6 +135,48 @@ var _ = Describe("controller.Controller", func() {
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
})

It("should default RateLimiter and NewQueue if not specified", func() {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller", m, controller.Options{
Reconciler: reconcile.Func(nil),
})
Expect(err).NotTo(HaveOccurred())

ctrl, ok := c.(*internalcontroller.Controller)
Expect(ok).To(BeTrue())

Expect(ctrl.RateLimiter).NotTo(BeNil())
Expect(ctrl.NewQueue).NotTo(BeNil())
})

It("should not override RateLimiter and NewQueue if specified", func() {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

customRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second)
customNewQueueCalled := false
customNewQueue := func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface {
customNewQueueCalled = true
return nil
}

c, err := controller.New("new-controller", m, controller.Options{
Reconciler: reconcile.Func(nil),
RateLimiter: customRateLimiter,
NewQueue: customNewQueue,
})
Expect(err).NotTo(HaveOccurred())

ctrl, ok := c.(*internalcontroller.Controller)
Expect(ok).To(BeTrue())

Expect(ctrl.RateLimiter).To(BeIdenticalTo(customRateLimiter))
ctrl.NewQueue("controller1", nil)
Expect(customNewQueueCalled).To(BeTrue(), "Expected customNewQueue to be called")
})

It("should default RecoverPanic from the manager", func() {
m, err := manager.New(cfg, manager.Options{Controller: config.Controller{RecoverPanic: ptr.To(true)}})
Expect(err).NotTo(HaveOccurred())
Expand Down
12 changes: 8 additions & 4 deletions pkg/internal/controller/controller.go
Expand Up @@ -33,6 +33,7 @@ import (
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand All @@ -50,10 +51,13 @@ type Controller struct {
// Defaults to the DefaultReconcileFunc.
Do reconcile.Reconciler

// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// RateLimiter is used to limit how frequently requests may be queued into the work queue.
RateLimiter ratelimiter.RateLimiter

// NewQueue constructs the queue for this controller once the controller is ready to start.
// This is a func because the standard Kubernetes work queues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func() workqueue.RateLimitingInterface
NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface

// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Expand Down Expand Up @@ -158,7 +162,7 @@ func (c *Controller) Start(ctx context.Context) error {
// Set the internal context.
c.ctx = ctx

c.Queue = c.MakeQueue()
c.Queue = c.NewQueue(c.Name, c.RateLimiter)
go func() {
<-ctx.Done()
c.Queue.ShutDown()
Expand Down
19 changes: 10 additions & 9 deletions pkg/internal/controller/controller_test.go
Expand Up @@ -43,6 +43,7 @@ import (
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
"sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand All @@ -68,7 +69,7 @@ var _ = Describe("controller", func() {
ctrl = &Controller{
MaxConcurrentReconciles: 1,
Do: fakeReconcile,
MakeQueue: func() workqueue.RateLimitingInterface { return queue },
NewQueue: func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return queue },
LogConstructor: func(_ *reconcile.Request) logr.Logger {
return log.RuntimeLog.WithName("controller").WithName("test")
},
Expand Down Expand Up @@ -408,8 +409,8 @@ var _ = Describe("controller", func() {
// TODO(directxman12): we should ensure that backoff occurrs with error requeue

It("should not reset backoff until there's a non-error result", func() {
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -444,8 +445,8 @@ var _ = Describe("controller", func() {
})

It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() {
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -474,8 +475,8 @@ var _ = Describe("controller", func() {
})

It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() {
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -504,8 +505,8 @@ var _ = Describe("controller", func() {
})

It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down