Skip to content

Commit

Permalink
Add controller workqueue option
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Büringer buringerst@vmware.com
  • Loading branch information
sbueringer committed Apr 11, 2024
1 parent 20f3f4b commit 6e23721
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 19 deletions.
29 changes: 23 additions & 6 deletions pkg/controller/controller.go
Expand Up @@ -59,6 +59,18 @@ type Options struct {
// The overall is a token bucket and the per-item is exponential.
RateLimiter ratelimiter.RateLimiter

// NewQueue constructs the queue for this controller once the controller is ready to start.
// With NewQueue a custom queue implementation can be used, e.g. a priority queue to prioritize with which
// priority/order objects are reconciled (e.g. to reconcile objects with changes first).
// This is a func because the standard Kubernetes work queues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
// The NewQueue func gets the controller name and the RateLimiter option (defaulted if necessary) passed in.
// NewQueue defaults to NewRateLimitingQueueWithConfig.
//
// NOTE: LOW LEVEL PRIMITIVE!
// Only use a custom NewQueue if you know what you are doing.
NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface

// LogConstructor is used to construct a logger used for this controller and passed
// to each reconciliation via the context field.
LogConstructor func(request *reconcile.Request) logr.Logger
Expand Down Expand Up @@ -147,6 +159,14 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
}

if options.NewQueue == nil {
options.NewQueue = func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface {
return workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{
Name: controllerName,
})
}
}

if options.RecoverPanic == nil {
options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic
}
Expand All @@ -157,12 +177,9 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller

// Create controller with dependencies set
return &controller.Controller{
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{
Name: name,
})
},
Do: options.Reconciler,
RateLimiter: options.RateLimiter,
NewQueue: options.NewQueue,
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
Name: name,
Expand Down
44 changes: 44 additions & 0 deletions pkg/controller/controller_test.go
Expand Up @@ -24,6 +24,7 @@ import (
. "github.com/onsi/gomega"
"go.uber.org/goleak"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/config"
Expand All @@ -32,6 +33,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand Down Expand Up @@ -133,6 +135,48 @@ var _ = Describe("controller.Controller", func() {
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
})

It("should default RateLimiter and NewQueue if not specified", func() {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller", m, controller.Options{
Reconciler: reconcile.Func(nil),
})
Expect(err).NotTo(HaveOccurred())

ctrl, ok := c.(*internalcontroller.Controller)
Expect(ok).To(BeTrue())

Expect(ctrl.RateLimiter).NotTo(BeNil())
Expect(ctrl.NewQueue).NotTo(BeNil())
})

It("should not override RateLimiter and NewQueue if specified", func() {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

customRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second)
customNewQueueCalled := false
customNewQueue := func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface {
customNewQueueCalled = true
return nil
}

c, err := controller.New("new-controller", m, controller.Options{
Reconciler: reconcile.Func(nil),
RateLimiter: customRateLimiter,
NewQueue: customNewQueue,
})
Expect(err).NotTo(HaveOccurred())

ctrl, ok := c.(*internalcontroller.Controller)
Expect(ok).To(BeTrue())

Expect(ctrl.RateLimiter).To(BeIdenticalTo(customRateLimiter))
ctrl.NewQueue("controller1", nil)
Expect(customNewQueueCalled).To(BeTrue(), "Expected customNewQueue to be called")
})

It("should default RecoverPanic from the manager", func() {
m, err := manager.New(cfg, manager.Options{Controller: config.Controller{RecoverPanic: ptr.To(true)}})
Expect(err).NotTo(HaveOccurred())
Expand Down
12 changes: 8 additions & 4 deletions pkg/internal/controller/controller.go
Expand Up @@ -33,6 +33,7 @@ import (
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand All @@ -50,10 +51,13 @@ type Controller struct {
// Defaults to the DefaultReconcileFunc.
Do reconcile.Reconciler

// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// RateLimiter is used to limit how frequently requests may be queued into the work queue.
RateLimiter ratelimiter.RateLimiter

// NewQueue constructs the queue for this controller once the controller is ready to start.
// This is a func because the standard Kubernetes work queues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func() workqueue.RateLimitingInterface
NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface

// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Expand Down Expand Up @@ -158,7 +162,7 @@ func (c *Controller) Start(ctx context.Context) error {
// Set the internal context.
c.ctx = ctx

c.Queue = c.MakeQueue()
c.Queue = c.NewQueue(c.Name, c.RateLimiter)
go func() {
<-ctx.Done()
c.Queue.ShutDown()
Expand Down
19 changes: 10 additions & 9 deletions pkg/internal/controller/controller_test.go
Expand Up @@ -43,6 +43,7 @@ import (
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
"sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand All @@ -68,7 +69,7 @@ var _ = Describe("controller", func() {
ctrl = &Controller{
MaxConcurrentReconciles: 1,
Do: fakeReconcile,
MakeQueue: func() workqueue.RateLimitingInterface { return queue },
NewQueue: func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return queue },
LogConstructor: func(_ *reconcile.Request) logr.Logger {
return log.RuntimeLog.WithName("controller").WithName("test")
},
Expand Down Expand Up @@ -408,8 +409,8 @@ var _ = Describe("controller", func() {
// TODO(directxman12): we should ensure that backoff occurrs with error requeue

It("should not reset backoff until there's a non-error result", func() {
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -444,8 +445,8 @@ var _ = Describe("controller", func() {
})

It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() {
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -474,8 +475,8 @@ var _ = Describe("controller", func() {
})

It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() {
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -504,8 +505,8 @@ var _ = Describe("controller", func() {
})

It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit 6e23721

Please sign in to comment.